This is an automated email from the ASF dual-hosted git repository. havret pushed a commit to branch AMQNET-846 in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
commit ed321c3b3557e5759239574bad59b7ebf797afa2 Author: Havret <h4v...@gmail.com> AuthorDate: Sat Jun 7 00:05:39 2025 +0200 AMQNET-846 Add option to customize acknowledgment behavior for expired messages via IRedeliveryPolicy --- src/NMS.AMQP/NmsMessageConsumer.cs | 31 ++- ...eliveryPolicy.cs => DefaultRedeliveryPolicy.cs} | 13 +- .../Integration/ConsumerIntegrationTest.cs | 272 +++++++++++++++++++++ test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 7 + 4 files changed, 314 insertions(+), 9 deletions(-) diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs index d37b55a..0e8d223 100644 --- a/src/NMS.AMQP/NmsMessageConsumer.cs +++ b/src/NMS.AMQP/NmsMessageConsumer.cs @@ -544,7 +544,6 @@ namespace Apache.NMS.AMQP Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}"); } - // TODO: Apply redelivery policy await DoAckExpiredAsync(envelope).Await(); } else @@ -602,7 +601,35 @@ namespace Apache.NMS.AMQP private Task DoAckExpiredAsync(InboundMessageDispatch envelope) { - return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope); + if (Session.Connection.RedeliveryPolicy != null) + { + var dispositionType = Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination); + var ackType = LookupAckTypeForDisposition(dispositionType); + return Session.AcknowledgeAsync(ackType, envelope); + } + else + { + return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope); + } + } + + private AckType LookupAckTypeForDisposition(int dispositionType) + { + switch (dispositionType) + { + case 0: // ACCEPTED + return AckType.ACCEPTED; + case 1: // REJECTED + return AckType.REJECTED; + case 2: // RELEASED + return AckType.RELEASED; + case 3: // MODIFIED_FAILED + return AckType.MODIFIED_FAILED; + case 4: // MODIFIED_FAILED_UNDELIVERABLE + return AckType.MODIFIED_FAILED_UNDELIVERABLE; + default: + throw new ArgumentOutOfRangeException(nameof(dispositionType), "Unknown disposition type"); + } } private void SetAcknowledgeCallback(InboundMessageDispatch envelope) diff --git a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs similarity index 79% rename from src/NMS.AMQP/Policies/RedeliveryPolicy.cs rename to src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs index 28a5355..614ec8f 100644 --- a/src/NMS.AMQP/Policies/RedeliveryPolicy.cs +++ b/src/NMS.AMQP/Policies/DefaultRedeliveryPolicy.cs @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace Apache.NMS.AMQP.Policies { - class RedeliveryPolicy + public class DefaultRedeliveryPolicy : Apache.NMS.Policies.RedeliveryPolicy { + public override int GetOutcome(IDestination destination) + { + return 4; // MODIFIED_FAILED_UNDELIVERABLE + } } -} +} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs index 6bfb1f5..36c9752 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs @@ -1097,5 +1097,277 @@ namespace NMS.AMQP.Test.Integration testPeer.WaitForAllMatchersToComplete(2000); } } + + [Test, Timeout(20_000)] + public void TestMessageNackedWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new DefaultRedeliveryPolicy { MaximumRedeliveries = 1 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDispositionThatIsModifiedFailedAndSettled(); + + IMessageConsumer consumer = session.CreateConsumer(queue); + + IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestMessageAcceptedWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new CustomRedeliveryPolicy { MaximumRedeliveries = 1, Outcome = 0 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDisposition(settled: true, state => + { + Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); + }); + + var consumer = session.CreateConsumer(queue); + + var m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestMessageRejectedWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new CustomRedeliveryPolicy { MaximumRedeliveries = 1, Outcome = 1 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDisposition(settled: true, state => + { + Assert.AreEqual(state.Descriptor.Code, MessageSupport.REJECTED_INSTANCE.Descriptor.Code); + }); + + var consumer = session.CreateConsumer(queue); + + var m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestMessageReleasedWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new CustomRedeliveryPolicy { MaximumRedeliveries = 1, Outcome = 2 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDisposition(settled: true, state => + { + Assert.AreEqual(state.Descriptor.Code, MessageSupport.RELEASED_INSTANCE.Descriptor.Code); + }); + + var consumer = session.CreateConsumer(queue); + + var m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestMessageModifiedFailedWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new CustomRedeliveryPolicy { MaximumRedeliveries = 1, Outcome = 3 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDisposition(settled: true, state => + { + Assert.AreEqual(state.Descriptor.Code, MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); + }); + + var consumer = session.CreateConsumer(queue); + + var m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + [Test, Timeout(20_000)] + public void TestMessageModifiedFailedUndeliverableWhenRedeliveryCountExceeded() + { + using (var testPeer = new TestAmqpPeer()) + { + var connection = EstablishConnection(testPeer); + connection.RedeliveryPolicy = new CustomRedeliveryPolicy { MaximumRedeliveries = 1, Outcome = 4 }; + connection.Start(); + + testPeer.ExpectBegin(); + + var session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge); + var queue = session.GetQueue("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message { BodySection = new AmqpValue { Value = "hello" } }); + testPeer.ExpectDisposition(settled: true, state => + { + Assert.AreEqual(state.Descriptor.Code, MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); + Assert.IsTrue(state is Modified modified && modified.DeliveryFailed && modified.UndeliverableHere); + }); + + var consumer = session.CreateConsumer(queue); + + var m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); + + Assert.NotNull(m, "Message should have been received"); + Assert.IsInstanceOf<ITextMessage>(m); + session.Recover(); + + // Verify the message is no longer there. Will drain to be sure there are no messages. + Assert.IsNull(consumer.Receive(TimeSpan.Zero), "Message should not have been received"); + + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(3000); + } + } + + private class CustomRedeliveryPolicy : Apache.NMS.Policies.RedeliveryPolicy + { + public int Outcome { get; set; } + + public override int GetOutcome(IDestination destination) + { + return Outcome; + } + } } } \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs index e61cc88..d59329e 100644 --- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs +++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs @@ -726,6 +726,13 @@ namespace NMS.AMQP.Test.TestAmqp ExpectDisposition(settled: true, stateMatcher: stateMatcher); } + + public void ExpectDispositionThatIsModifiedFailedAndSettled() + { + Action<DeliveryState> stateMatcher = state => { Assert.AreEqual(state.Descriptor.Code, MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); }; + + ExpectDisposition(settled: true, stateMatcher: stateMatcher); + } public void ExpectDisposition(bool settled, Action<DeliveryState> stateMatcher, uint? firstDeliveryId = null, uint? lastDeliveryId = null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org For additional commands, e-mail: commits-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact