This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push:
new cccd4ee AMQNET-831 allow acktype modified_failed (#86)
cccd4ee is described below
commit cccd4ee2efb7fa669f04b43cfb3c386a40a3a380
Author: Jef Willems <[email protected]>
AuthorDate: Wed Mar 22 23:01:41 2023 +0100
AMQNET-831 allow acktype modified_failed (#86)
* AMQNET-831 allow more acktypes
* AMQNET-831 : add tests & rejected outcome
Signed-off-by: Jefwillems <[email protected]>
* AMQNET-831 : refactor tests to resolve issues
Signed-off-by: Jefwillems <[email protected]>
* AMQNET-831 : remove unused code
Signed-off-by: Jefwillems <[email protected]>
---------
Signed-off-by: Jefwillems <[email protected]>
---
src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs | 8 +
.../AmqpAcknowledgmentsIntegrationTest.cs | 190 +++++++++++++++++++++
2 files changed, 198 insertions(+)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 99666c8..28c5d17 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -332,6 +332,14 @@ namespace Apache.NMS.AMQP.Provider.Amqp
receiverLink.Modify(message, true, true);
RemoveMessage(envelope);
break;
+ case AckType.MODIFIED_FAILED:
+ receiverLink.Modify(message, true, false);
+ RemoveMessage(envelope);
+ break;
+ case AckType.REJECTED:
+ receiverLink.Reject(message);
+ RemoveMessage(envelope);
+ break;
default:
Tracer.ErrorFormat("Unsupported Ack Type for message:
{0}", envelope);
throw new ArgumentException($"Unsupported Ack Type for
message: {envelope}");
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
index 2609400..505e133 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
@@ -21,6 +21,7 @@ using System.Threading;
using System.Threading.Tasks;
using Amqp.Framing;
using Apache.NMS;
+using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Util;
using NMS.AMQP.Test.TestAmqp;
using NUnit.Framework;
@@ -273,6 +274,195 @@ namespace NMS.AMQP.Test.Integration
testPeer.WaitForAllMatchersToComplete(3000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestModifyUndeliverableIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ testPeer.ExpectDisposition(settled: true, stateMatcher:
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId:
deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType =
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRejectIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ testPeer.ExpectDisposition(settled: true, stateMatcher:
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId:
deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.REJECTED;
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestModifyIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ testPeer.ExpectDisposition(settled: true, stateMatcher:
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId:
deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.MODIFIED_FAILED;
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
[Test, Timeout(20_000)]
public void TestAutoAcknowledgeMessages()