This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/main by this push:
     new cccd4ee  AMQNET-831 allow acktype modified_failed (#86)
cccd4ee is described below

commit cccd4ee2efb7fa669f04b43cfb3c386a40a3a380
Author: Jef Willems <[email protected]>
AuthorDate: Wed Mar 22 23:01:41 2023 +0100

    AMQNET-831 allow acktype modified_failed (#86)
    
    * AMQNET-831 allow more acktypes
    
    * AMQNET-831 : add tests & rejected outcome
    
    Signed-off-by: Jefwillems <[email protected]>
    
    * AMQNET-831 : refactor tests to resolve issues
    
    Signed-off-by: Jefwillems <[email protected]>
    
    * AMQNET-831 : remove unused code
    
    Signed-off-by: Jefwillems <[email protected]>
    
    ---------
    
    Signed-off-by: Jefwillems <[email protected]>
---
 src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs         |   8 +
 .../AmqpAcknowledgmentsIntegrationTest.cs          | 190 +++++++++++++++++++++
 2 files changed, 198 insertions(+)

diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs 
b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 99666c8..28c5d17 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -332,6 +332,14 @@ namespace Apache.NMS.AMQP.Provider.Amqp
                         receiverLink.Modify(message, true, true);
                         RemoveMessage(envelope);
                         break;
+                    case AckType.MODIFIED_FAILED:
+                        receiverLink.Modify(message, true, false);
+                        RemoveMessage(envelope);
+                        break;
+                    case AckType.REJECTED:
+                        receiverLink.Reject(message);
+                        RemoveMessage(envelope);
+                        break;
                     default:
                         Tracer.ErrorFormat("Unsupported Ack Type for message: 
{0}", envelope);
                         throw new ArgumentException($"Unsupported Ack Type for 
message: {envelope}");
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
index 2609400..505e133 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs
@@ -21,6 +21,7 @@ using System.Threading;
 using System.Threading.Tasks;
 using Amqp.Framing;
 using Apache.NMS;
+using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Util;
 using NMS.AMQP.Test.TestAmqp;
 using NUnit.Framework;
@@ -273,6 +274,195 @@ namespace NMS.AMQP.Test.Integration
                 testPeer.WaitForAllMatchersToComplete(3000);
             }
         }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyUndeliverableIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+                    
+                    testPeer.ExpectDisposition(settled: true, stateMatcher: 
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: 
deliveryNumber);
+                    ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = 
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+                    
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestRejectIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+                    
+                    testPeer.ExpectDisposition(settled: true, stateMatcher: 
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: 
deliveryNumber);
+                    ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.REJECTED;
+
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+                    
+                    testPeer.ExpectDisposition(settled: true, stateMatcher: 
dispositionMatcherFailed, firstDeliveryId: deliveryNumber, lastDeliveryId: 
deliveryNumber);
+                    ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.MODIFIED_FAILED;
+                        
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
 
         [Test, Timeout(20_000)]
         public void TestAutoAcknowledgeMessages()

Reply via email to