Havret commented on code in PR #86:
URL: https://github.com/apache/activemq-nms-amqp/pull/86#discussion_r1143875279
##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
testPeer.WaitForAllMatchersToComplete(3000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestModifyUndeliverableIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
Review Comment:
This cod in unreachable.
##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
testPeer.WaitForAllMatchersToComplete(3000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestModifyUndeliverableIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType =
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+ }
+ else
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ }
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRejectIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
Review Comment:
This is also not reachable.
##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
testPeer.WaitForAllMatchersToComplete(3000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestModifyUndeliverableIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType =
AckType.MODIFIED_FAILED_UNDELIVERABLE;
+ }
+ else
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ }
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestRejectIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.REJECTED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType = AckType.REJECTED;
+ }
+ else
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcher, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ }
+
+ message.Acknowledge();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestModifyIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
Review Comment:
This is also unreachable.
##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
@@ -273,6 +274,219 @@ public void TestAcknowledgeIndividualMessagesAsync()
testPeer.WaitForAllMatchersToComplete(3000);
}
}
+
+ [Test, Timeout(20_000)]
+ public void TestModifyUndeliverableIndividualMessagesAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ int msgCount = 6;
+
+ IConnection connection = EstablishConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(
+ message: CreateMessageWithNullContent(),
+ count: msgCount,
+ drain: false,
+ nextIncomingId: 1,
+ addMessageNumberProperty: true,
+ sendDrainFlowResponse: false,
+ sendSettled: false,
+ creditMatcher: credit => Assert.Greater(credit, msgCount));
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent latch = new CountdownEvent(msgCount);
+ List<ITextMessage> messages = new List<ITextMessage>();
+ consumer.Listener += message =>
+ {
+ messages.Add((ITextMessage) message);
+ latch.Signal();
+ };
+
+ Assert.True(latch.Wait(TimeSpan.FromMilliseconds(1000)),
$"Should receive: {msgCount}, but received: {messages.Count}");
+
+ Action<DeliveryState> dispositionMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code); };
+ Action<DeliveryState> dispositionMatcherFailed = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
+
+ // Acknowledge the messages in a random order and verify the
individual dispositions have expected delivery state.
+ Random random = new Random();
+ for (int i = 0; i < msgCount; i++)
+ {
+ var message = messages[random.Next(msgCount - i)];
+ messages.Remove(message);
+
+ uint deliveryNumber = (uint)
message.Properties.GetInt(TestAmqpPeer.MESSAGE_NUMBER) + 1;
+
+ if (deliveryNumber == 0)
+ {
+ testPeer.ExpectDisposition(settled: true,
stateMatcher: dispositionMatcherFailed, firstDeliveryId: deliveryNumber,
lastDeliveryId: deliveryNumber);
+ ((NmsMessage)
message).NmsAcknowledgeCallback.AcknowledgementType =
AckType.MODIFIED_FAILED_UNDELIVERABLE;
Review Comment:
We should update this to match how it's done in qpid-jms, but we can address
it at a later time.
##########
test/Apache-NMS-AMQP-Test/Integration/AmqpAcknowledgmentsIntegrationTest.cs:
##########
Review Comment:
I suggest simplifying these tests to focus on a single, straightforward nack
without involving randomness. The current approach adds unnecessary complexity
and important details, such as the fact that the nack was not being made, are
being overlooked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]