This is an automated email from the ASF dual-hosted git repository. Havret pushed a commit to branch Add-test-for-AsyncListener-behavior-after-removal-to-prevent-deadlock in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
commit 7dc40ebc6fac2e161ba19d17d495dc43024e068a Author: Havret <[email protected]> AuthorDate: Sun May 17 21:35:02 2026 +0200 Add test for AsyncListener behavior after removal to prevent deadlock --- .../Async/ConsumerIntegrationTestAsync.cs | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs index 4ef6078..916af5a 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs @@ -1094,5 +1094,50 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.WaitForAllMatchersToComplete(2000); } } + + [Test, Timeout(20_000)] + public async Task TestAsyncListenerAddAfterRemoveReceivesMessages() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + IConnection connection = await EstablishConnectionAsync(testPeer); + await connection.StartAsync(); + + testPeer.ExpectBegin(); + ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge); + IQueue destination = await session.GetQueueAsync("myQueue"); + + testPeer.ExpectReceiverAttach(); + testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: 1); + testPeer.ExpectDispositionThatIsAcceptedAndSettled(); + + IMessageConsumer consumer = await session.CreateConsumerAsync(destination); + + // Remove a handler that was never added — this is the sequence that triggered the bug. + // With the buggy LockAsync() call, the semaphore was acquired but never released, + // causing the subsequent add to deadlock. + AsyncMessageListener handler = (_, _) => Task.CompletedTask; + consumer.AsyncListener -= handler; + + ManualResetEvent messageReceived = new ManualResetEvent(false); + consumer.AsyncListener += async (message, ct) => + { + await Task.CompletedTask; + messageReceived.Set(); + }; + + Assert.True(messageReceived.WaitOne(4000), "Message was not received after adding AsyncListener following a remove."); + + testPeer.WaitForAllMatchersToComplete(2000); + + testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); + await consumer.CloseAsync(); + + testPeer.ExpectClose(); + await connection.CloseAsync(); + + testPeer.WaitForAllMatchersToComplete(2000); + } + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
