This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/main by this push:
     new fdc0de2  AMQNET-847 Add support for client-side redelivery delay based 
on redelivery attempt count
fdc0de2 is described below

commit fdc0de27b01f40dd7272a6b8db5169daca992113
Author: Steve Martin <[email protected]>
AuthorDate: Wed Jul 30 22:47:55 2025 +0100

    AMQNET-847 Add support for client-side redelivery delay based on redelivery 
attempt count
    
    * Implement redelivery policy.
    Reject poison messages.
    
    * Updates following code review.
    
    * AMQNET-847 Add support for client-side redelivery delay based on 
redelivery attempt count
    
    * AMQNET-847 Fix build
    
    * AMQNET-847 Fix build
    
    ---------
    
    Co-authored-by: Havret <[email protected]>
---
 src/NMS.AMQP/NmsConnection.cs                      |   1 +
 src/NMS.AMQP/NmsMessageConsumer.cs                 |  65 +++++++++---
 .../MessageRedeliveryPolicyIntegrationTest.cs      | 116 +++++++++++++++++++++
 test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs |   5 +
 4 files changed, 172 insertions(+), 15 deletions(-)

diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 5afae35..420871b 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -23,6 +23,7 @@ using System.Threading;
 using System.Threading.Tasks;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Policies;
 using Apache.NMS.AMQP.Provider;
 using Apache.NMS.AMQP.Util;
 using Apache.NMS.AMQP.Util.Synchronization;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs 
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 73206cd..dc20c0b 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -384,8 +384,7 @@ namespace Apache.NMS.AMQP
                                     Tracer.Debug($"{Info.Id} filtered message 
with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                                 }
 
-                                // TODO: Apply redelivery policy
-                                await DoAckExpiredAsync(envelope).Await();
+                                await 
ApplyRedeliveryPolicyOutcome(envelope).Await();
                             }
                             else
                             {
@@ -396,7 +395,9 @@ namespace Apache.NMS.AMQP
                                     await 
DoAckDeliveredAsync(envelope).Await();
                                 else
                                     await 
AckFromReceiveAsync(envelope).Await();
-
+                                
+                                await ApplyRedeliveryPolicy(envelope).Await();
+                                
                                 try
                                 {
                                     Listener?.Invoke(envelope.Message.Copy());
@@ -456,6 +457,17 @@ namespace Apache.NMS.AMQP
 
             return false;
         }
+        
+        private int GetRedeliveryDelay(InboundMessageDispatch envelope)
+        {
+            var redeliveryPolicy = Session.Connection.RedeliveryPolicy;
+            
+            if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0) 
+                return 0;
+            
+            var redeliveryDelay = 
redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount);
+            return redeliveryDelay;
+        }
 
         private Task DoAckReleasedAsync(InboundMessageDispatch envelope)
         {
@@ -544,7 +556,7 @@ namespace Apache.NMS.AMQP
                             Tracer.Debug($"{Info.Id} filtered message with 
excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                         }
 
-                        await DoAckExpiredAsync(envelope).Await();
+                        await ApplyRedeliveryPolicyOutcome(envelope).Await();
                     }
                     else
                     {
@@ -553,6 +565,8 @@ namespace Apache.NMS.AMQP
                             Tracer.Debug($"{Info.Id} received message 
{envelope.Message.NMSMessageId}.");
                         }
 
+                        await ApplyRedeliveryPolicy(envelope).Await();
+
                         return await func.Invoke(envelope);
                     }
                 }
@@ -566,7 +580,17 @@ namespace Apache.NMS.AMQP
                 throw ExceptionSupport.Wrap(ex, "Receive failed");
             }
         }
-        
+
+        private async Task ApplyRedeliveryPolicy(InboundMessageDispatch 
envelope)
+        {
+            int redeliveryDelay = GetRedeliveryDelay(envelope);
+
+            if (redeliveryDelay > 0)
+            {
+                Tracer.DebugFormat("Envelope has been redelivered, apply 
redelivery policy wait {0} milliseconds", redeliveryDelay);
+                await 
Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await();
+            }
+        }
 
         private static long GetDeadline(int timeout)
         {
@@ -601,16 +625,7 @@ namespace Apache.NMS.AMQP
 
         private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
         {
-            if (Session.Connection.RedeliveryPolicy != null)
-            {
-                var dispositionType = 
Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
-                var ackType = LookupAckTypeForDisposition(dispositionType);
-                return Session.AcknowledgeAsync(ackType, envelope);
-            }
-            else
-            {
-                return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
-            }
+            return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
         }
 
         private static AckType LookupAckTypeForDisposition(int dispositionType)
@@ -630,6 +645,26 @@ namespace Apache.NMS.AMQP
             }
         }
 
+        private Task ApplyRedeliveryPolicyOutcome(InboundMessageDispatch 
envelope)
+        {
+            if (Tracer.IsDebugEnabled)
+            {
+                Tracer.DebugFormat("{0} filtered message with excessive 
redelivery count: {1}", Info.Id, envelope.RedeliveryCount);
+            }
+
+            AckType ackType;
+            if (Session.Connection.RedeliveryPolicy is { } redeliveryPolicy)
+            {
+                var dispositionType = 
redeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
+                ackType = LookupAckTypeForDisposition(dispositionType);
+            }
+            else
+            {
+                ackType = AckType.MODIFIED_FAILED_UNDELIVERABLE;
+            }
+            return Session.AcknowledgeAsync(ackType, envelope);
+        }
+
         private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
         {
             if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
 
b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
new file mode 100644
index 0000000..9569623
--- /dev/null
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Policies;
+using Apache.NMS.Policies;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class MessageRedeliveryPolicyIntegrationTest : 
IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public void TestIncomingDeliveryCountExceededMessageGetsRejected()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                int initialRedeliveryDelay = 1000;
+                int clockResolution = 15;
+                connection.RedeliveryPolicy = new DefaultRedeliveryPolicy { 
MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
+                testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                DateTime startTimer = DateTime.UtcNow;
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                
Assert.That(DateTime.UtcNow.Subtract(startTimer).TotalMilliseconds, 
Is.GreaterThanOrEqualTo(initialRedeliveryDelay - clockResolution));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+                
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)), 
"Message should not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer);
+                int initialRedeliveryDelay = 1000;
+                connection.RedeliveryPolicy = new DefaultRedeliveryPolicy { 
MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                
+                
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
+                testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                CountdownEvent success = new CountdownEvent(2);
+
+                consumer.Listener += m =>
+                {
+                        session.Recover();
+                        success.Signal();
+                };
+                
+                Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(3)), "Didn't 
get expected messages");
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs 
b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index d59329e..2fad923 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -727,6 +727,11 @@ namespace NMS.AMQP.Test.TestAmqp
             ExpectDisposition(settled: true, stateMatcher: stateMatcher);
         }
         
+        public void ExpectDispositionThatIsRejectedAndSettled()
+        {
+            ExpectDisposition(settled: true, stateMatcher: 
Assert.IsInstanceOf<Rejected>);
+        }
+        
         public void ExpectDispositionThatIsModifiedFailedAndSettled()
         {
             Action<DeliveryState> stateMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to