[ 
https://issues.apache.org/jira/browse/AMQNET-831?focusedWorklogId=852089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-852089
 ]

ASF GitHub Bot logged work on AMQNET-831:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/23 19:15
            Start Date: 21/Mar/23 19:15
    Worklog Time Spent: 10m 
      Work Description: Havret commented on code in PR #86:
URL: https://github.com/apache/activemq-nms-amqp/pull/86#discussion_r1143875279


##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
                 testPeer.WaitForAllMatchersToComplete(3000);
             }
         }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyUndeliverableIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);

Review Comment:
   This cod in unreachable. 



##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
                 testPeer.WaitForAllMatchersToComplete(3000);
             }
         }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyUndeliverableIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                        ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = 
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+                    }
+                    else
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                    }
+                    
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestRejectIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);

Review Comment:
   This is also not reachable. 



##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
                 testPeer.WaitForAllMatchersToComplete(3000);
             }
         }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyUndeliverableIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                        ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = 
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+                    }
+                    else
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                    }
+                    
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestRejectIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                        ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.REJECTED;
+                    }
+                    else
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                    }
+                    
+                    message.Acknowledge();
+                    
+                    testPeer.WaitForAllMatchersToComplete(3000);
+                }
+                
+                testPeer.ExpectClose();
+                connection.Close();
+                
+                testPeer.WaitForAllMatchersToComplete(3000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);

Review Comment:
   This is also unreachable. 



##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
                 testPeer.WaitForAllMatchersToComplete(3000);
             }
         }
+        
+        [Test, Timeout(20_000)]
+        public void TestModifyUndeliverableIndividualMessagesAsync()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                int msgCount = 6;
+
+                IConnection connection = EstablishConnection(testPeer);
+                connection.Start();
+
+                testPeer.ExpectBegin();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlowRespondWithTransfer(
+                    message: CreateMessageWithNullContent(),
+                    count: msgCount,
+                    drain: false,
+                    nextIncomingId: 1,
+                    addMessageNumberProperty: true,
+                    sendDrainFlowResponse: false,
+                    sendSettled: false,
+                    creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                
+                CountdownEvent latch = new CountdownEvent(msgCount);
+                List<ITextMessage> messages = new List<ITextMessage>();
+                consumer.Listener += message =>
+                {
+                    messages.Add((ITextMessage) message);
+                    latch.Signal();
+                };
+                
+                Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)), 
$"Should receive: {msgCount}, but received: {messages.Count}");
+                
+                Action<DeliveryState> dispositionMatcher = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+                Action<DeliveryState> dispositionMatcherFailed = state => { 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+                
+                // Acknowledge the messages in a random order and verify the 
individual dispositions have expected delivery state.
+                Random random = new Random();
+                for (int i = 0; i < msgCount; i++)
+                {
+                    var message = messages[random.Next(msgCount - i)];
+                    messages.Remove(message);
+
+                    uint deliveryNumber = (uint) 
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+                    if (deliveryNumber == 0)
+                    {
+                        testPeer.ExpectDisposition(settled: true, 
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber, 
lastDeliveryId: deliveryNumber);
+                        ((NmsMessage) 
message).NmsAcknowledgeCallback.AcknowledgementType = 
AckType.MODIFIED_FAILED_UNDELIVERABLE;

Review Comment:
   We should update this to match how it's done in qpid-jms, but we can address 
it at a later time.



##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########


Review Comment:
   I suggest simplifying these tests to focus on a single, straightforward nack 
without involving randomness. The current approach adds unnecessary complexity 
and important details, such as the fact that the nack was not being made, are 
being overlooked.





Issue Time Tracking
-------------------

            Worklog Id:     (was: 852089)
    Remaining Estimate: 23h 40m  (was: 23h 50m)
            Time Spent: 20m  (was: 10m)

> Allow extra AckTypes when using IndividualAcknowledge
> -----------------------------------------------------
>
>                 Key: AMQNET-831
>                 URL: https://issues.apache.org/jira/browse/AMQNET-831
>             Project: ActiveMQ .Net
>          Issue Type: New Feature
>          Components: ActiveMQ, AMQP, NMS
>    Affects Versions: AMQP-2.0.0
>         Environment: Tested with apache artemis (from the latest image on 
> quay.io/artemiscloud/activemq-artemis-broker)
>            Reporter: Jef Willems
>            Priority: Major
>              Labels: features, pull-request-available
>   Original Estimate: 24h
>          Time Spent: 20m
>  Remaining Estimate: 23h 40m
>
> I wrote a pull request to allow acknowledging messages with different 
> AckType's
> When using individual acknowledgement, the current setup only accepts 
> MODIFIED_FAILED_UNDELIVERABLE as a negative ack, the pull request allows to 
> use MODIFIED_FAILED, REJECTED as ack types. This way the client can 
> automatically use broker redelivery configurations.
> The client library also is more in line with the qpid-jms counterpart when 
> implementing this change: 
> [https://github.com/apache/qpid-jms/blob/main/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java#L323]
>  
> The pull request can be found here: 
> https://github.com/apache/activemq-nms-amqp/pull/86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to