This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch AMQNET-846
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git

commit ed321c3b3557e5759239574bad59b7ebf797afa2
Author: Havret <h4v...@gmail.com>
AuthorDate: Sat Jun 7 00:05:39 2025 +0200

    AMQNET-846 Add option to customize acknowledgment behavior for expired 
messages via IRedeliveryPolicy
---
 src/NMS.AMQP/NmsMessageConsumer.cs                 |  31 ++-
 ...eliveryPolicy.cs => DefaultRedeliveryPolicy.cs} |  13 +-
 .../Integration/ConsumerIntegrationTest.cs         | 272 +++++++++++++++++++++
 test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs |   7 +
 4 files changed, 314 insertions(+), 9 deletions(-)

diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs 
b/src/NMS.AMQP/NmsMessageConsumer.cs
index d37b55a..0e8d223 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -544,7 +544,6 @@ namespace Apache.NMS.AMQP
                             Tracer.Debug($"{Info.Id} filtered message with 
excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
                         }
 
-                        // TODO: Apply redelivery policy
                         await DoAckExpiredAsync(envelope).Await();
                     }
                     else
@@ -602,7 +601,35 @@ namespace Apache.NMS.AMQP
 
         private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
         {
-            return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+            if (Session.Connection.RedeliveryPolicy != null)
+            {
+                var dispositionType = 
Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
+                var ackType = LookupAckTypeForDisposition(dispositionType);
+                return Session.AcknowledgeAsync(ackType, envelope);
+            }
+            else
+            {
+                return 
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
+            }
+        }
+
+        private AckType LookupAckTypeForDisposition(int dispositionType)
+        {
+            switch (dispositionType)
+            {
+                case 0: // ACCEPTED
+                    return AckType.ACCEPTED;
+                case 1: // REJECTED
+                    return AckType.REJECTED;
+                case 2: // RELEASED
+                    return AckType.RELEASED;
+                case 3: // MODIFIED_FAILED
+                    return AckType.MODIFIED_FAILED;
+                case 4: // MODIFIED_FAILED_UNDELIVERABLE
+                    return AckType.MODIFIED_FAILED_UNDELIVERABLE;
+                default:
+                    throw new 
ArgumentOutOfRangeException(nameof(dispositionType), "Unknown disposition 
type");
+            }
         }
 
         private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
diff --git a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs 
b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
similarity index 79%
rename from src/NMS.AMQP/Policies/RedeliveryPolicy.cs
rename to src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
index 28a5355..614ec8f 100644
--- a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs
+++ b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 
 namespace Apache.NMS.AMQP.Policies
 {
-    class RedeliveryPolicy
+    public class DefaultRedeliveryPolicy : Apache.NMS.Policies.RedeliveryPolicy
     {
+        public override int GetOutcome(IDestination destination)
+        {
+            return 4; // MODIFIED_FAILED_UNDELIVERABLE
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index 6bfb1f5..36c9752 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -1097,5 +1097,277 @@ namespace NMS.AMQP.Test.Integration
                 testPeer.WaitForAllMatchersToComplete(2000);
             }
         }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageNackedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new DefaultRedeliveryPolicy { 
MaximumRedeliveries = 1 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestMessageAcceptedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 0 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageRejectedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 1 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.REJECTED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageReleasedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 2 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.RELEASED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestMessageModifiedFailedWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 3 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void 
TestMessageModifiedFailedUndeliverableWhenRedeliveryCountExceeded()
+        {
+            using (var testPeer = new TestAmqpPeer())
+            {
+                var connection = EstablishConnection(testPeer);
+                connection.RedeliveryPolicy = new CustomRedeliveryPolicy { 
MaximumRedeliveries = 1, Outcome = 4 };
+                connection.Start();
+
+                testPeer.ExpectBegin();
+
+                var session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                var queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message { BodySection = new AmqpValue { Value = "hello" } });
+                testPeer.ExpectDisposition(settled: true, state =>
+                {
+                    Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code);
+                    Assert.IsTrue(state is Modified modified && 
modified.DeliveryFailed && modified.UndeliverableHere);
+                });
+
+                var consumer = session.CreateConsumer(queue);
+
+                var m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
+                Assert.NotNull(m, "Message should have been received");
+                Assert.IsInstanceOf<ITextMessage>(m);
+                session.Recover();
+
+                // Verify the message is no longer there. Will drain to be 
sure there are no messages.
+                Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should 
not have been received");
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+
+        private class CustomRedeliveryPolicy : 
Apache.NMS.Policies.RedeliveryPolicy
+        {
+            public int Outcome { get; set; }
+            
+            public override int GetOutcome(IDestination destination)
+            {
+                return Outcome;
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs 
b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index e61cc88..d59329e 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -726,6 +726,13 @@ namespace NMS.AMQP.Test.TestAmqp
 
             ExpectDisposition(settled: true, stateMatcher: stateMatcher);
         }
+        
+        public void ExpectDispositionThatIsModifiedFailedAndSettled()
+        {
+            Action<DeliveryState> stateMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+            
+            ExpectDisposition(settled: true, stateMatcher: stateMatcher);
+        }
 
         public void ExpectDisposition(bool settled, Action<DeliveryState> 
stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to