This is an automated email from the ASF dual-hosted git repository. havret pushed a commit to branch add_async_consumer in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
commit be594b0d4fc889acec0e8f97c8e6f826733a8169 Author: Havret <[email protected]> AuthorDate: Sun Sep 10 18:43:40 2023 +0200 AMQNET-735 Add AsyncMessageListener to Consumer API --- src/NMS.AMQP/Apache-NMS-AMQP.csproj | 2 +- src/NMS.AMQP/NmsConsumer.cs | 6 +++ src/NMS.AMQP/NmsMessageConsumer.cs | 47 ++++++++++++++----- src/NMS.AMQP/SessionDispatcher.cs | 2 +- .../Async/ConsumerIntegrationTestAsync.cs | 50 ++++++++++----------- .../Async/NMSConsumerIntegrationTestAsync.cs | 52 ++++++++++++---------- 6 files changed, 98 insertions(+), 61 deletions(-) diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj index dc14816..523295c 100644 --- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj +++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj @@ -95,7 +95,7 @@ with the License. You may obtain a copy of the License at <ItemGroup> <!-- AMQPNetLite.Core is .NET Standard 1.3 package --> <PackageReference Include="AMQPNetLite.Core" Version="2.4.3" /> - <PackageReference Include="Apache.NMS" Version="2.0.0" /> + <PackageReference Include="Apache.NMS" Version="2.1.0" /> <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" /> </ItemGroup> </Project> diff --git a/src/NMS.AMQP/NmsConsumer.cs b/src/NMS.AMQP/NmsConsumer.cs index d6d905b..0ee265a 100644 --- a/src/NMS.AMQP/NmsConsumer.cs +++ b/src/NMS.AMQP/NmsConsumer.cs @@ -109,5 +109,11 @@ namespace Apache.NMS.AMQP add => ((IMessageConsumer)consumer).Listener += value; remove => ((IMessageConsumer)consumer).Listener -= value; } + + event AsyncMessageListener INMSConsumer.AsyncListener + { + add => ((IMessageConsumer)consumer).AsyncListener += value; + remove => ((IMessageConsumer)consumer).AsyncListener -= value; + } } } \ No newline at end of file diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs index 05261e6..d37b55a 100644 --- a/src/NMS.AMQP/NmsMessageConsumer.cs +++ b/src/NMS.AMQP/NmsMessageConsumer.cs @@ -16,6 +16,7 @@ */ using System; +using System.Threading; using System.Threading.Tasks; using Apache.NMS.AMQP.Message; using Apache.NMS.AMQP.Meta; @@ -110,6 +111,26 @@ namespace Apache.NMS.AMQP public ConsumerTransformerDelegate ConsumerTransformer { get; set; } public string MessageSelector => Info.Selector; + + event AsyncMessageListener IMessageConsumer.AsyncListener + { + add + { + CheckClosed(); + using(syncRoot.Lock()) + { + AsyncListener += value; + DrainMessageQueueToListener(); + } + } + remove + { + using (syncRoot.LockAsync()) + { + AsyncListener -= value; + } + } + } event MessageListener IMessageConsumer.Listener { @@ -284,6 +305,8 @@ namespace Apache.NMS.AMQP private event MessageListener Listener; + private event AsyncMessageListener AsyncListener; + public async Task Init() { await Session.Connection.CreateResource(Info).Await(); @@ -310,7 +333,7 @@ namespace Apache.NMS.AMQP else messageQueue.Enqueue(envelope); - if (Session.IsStarted && Listener != null) + if (Session.IsStarted && HasMessageListener()) { using (syncRoot.Exclude()) // Exclude lock for a time of dispatching, so it does not pass along to actionblock { @@ -319,27 +342,27 @@ namespace Apache.NMS.AMQP } } - private async Task DeliverNextPendingAsync() + private async Task DeliverNextPendingAsync(CancellationToken cancellationToken) { if (Tracer.IsDebugEnabled) { Tracer.Debug($"{Info.Id} is about to deliver next pending message."); } - if (Session.IsStarted && started && Listener != null) + if (Session.IsStarted && this.started && HasMessageListener()) { using(await syncRoot.LockAsync().Await()) { try { - if (started && Listener != null) + if (this.started && HasMessageListener()) { var envelope = messageQueue.DequeueNoWait(); if (envelope == null) { if (Tracer.IsDebugEnabled) { - Tracer.Debug($"No message available for delivery."); + Tracer.Debug("No message available for delivery."); } return; @@ -376,7 +399,11 @@ namespace Apache.NMS.AMQP try { - Listener.Invoke(envelope.Message.Copy()); + Listener?.Invoke(envelope.Message.Copy()); + if (AsyncListener != null) + { + await AsyncListener.Invoke(envelope.Message.Copy(), cancellationToken).Await(); + } } catch (Exception) { @@ -592,7 +619,7 @@ namespace Apache.NMS.AMQP public bool HasMessageListener() { - return Listener != null; + return Listener != null || AsyncListener != null; } public void Shutdown(Exception exception) @@ -628,7 +655,7 @@ namespace Apache.NMS.AMQP private void DrainMessageQueueToListener() { - if (Listener != null && Session.IsStarted) + if (HasMessageListener() && Session.IsStarted) { int size = messageQueue.Count; for (int i = 0; i < size; i++) @@ -715,9 +742,9 @@ namespace Apache.NMS.AMQP this.consumer = consumer; } - public Task DeliverNextPending() + public Task DeliverNextPending(CancellationToken cancellationToken) { - return consumer.DeliverNextPendingAsync(); + return consumer.DeliverNextPendingAsync(cancellationToken); } } } diff --git a/src/NMS.AMQP/SessionDispatcher.cs b/src/NMS.AMQP/SessionDispatcher.cs index 53a5da9..5f04891 100644 --- a/src/NMS.AMQP/SessionDispatcher.cs +++ b/src/NMS.AMQP/SessionDispatcher.cs @@ -49,7 +49,7 @@ namespace Apache.NMS.AMQP try { isOnDispatcherFlow.Value = true; - await messageDeliveryTask.DeliverNextPending().Await(); + await messageDeliveryTask.DeliverNextPending(cts.Token).Await(); } finally { diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs index 5445d5e..4ef6078 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs @@ -254,7 +254,7 @@ namespace NMS.AMQP.Test.Integration.Async IQueue queue = await session.GetQueueAsync("myQueue"); IMessageConsumer consumer = await session.CreateConsumerAsync(queue); - consumer.Listener += message => { }; + consumer.AsyncListener += (message, ct) => Task.CompletedTask; // Verify the consumer gets marked closed testPeer.WaitForAllMatchersToComplete(1000); @@ -313,9 +313,9 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectReceiverAttach(); testPeer.ExpectLinkFlowRespondWithTransfer(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1); testPeer.ExpectDispositionThatIsReleasedAndSettled(); - + IMessageConsumer consumer = await session.CreateConsumerAsync(queue); - consumer.Listener += message => throw new Exception(); + consumer.AsyncListener += (message, ct) => throw new Exception(); testPeer.WaitForAllMatchersToComplete(2000); @@ -606,14 +606,14 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - + IMessageConsumer consumer = await session.CreateConsumerAsync(destination); - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { - IMessageProducer producer = session.CreateProducer(outbound); - producer.Send(message); - producer.Close(); + IMessageProducer producer = await session.CreateProducerAsync(outbound); + await producer.SendAsync(message); + await producer.CloseAsync(); }; testPeer.WaitForAllMatchersToComplete(10_000); @@ -648,11 +648,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - connection.Close(); + await connection.CloseAsync(); } catch (Exception e) { @@ -701,11 +701,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - connection.Stop(); + await connection.StopAsync(); } catch (Exception e) { @@ -754,11 +754,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - session.Close(); + await session.CloseAsync(); } catch (Exception e) { @@ -813,11 +813,11 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - consumer.Close(); + await consumer.CloseAsync(); latch.Set(); } catch (Exception e) @@ -878,7 +878,7 @@ namespace NMS.AMQP.Test.Integration.Async bool complete = false; int messageSeen = 0; int expectedIndex = 0; - consumer.Listener += message => + consumer.AsyncListener += async(message, ct) => { if (complete) { @@ -893,7 +893,7 @@ namespace NMS.AMQP.Test.Integration.Async // don't ack the message until we receive it X times if (messageSeen < recoverCount) { - session.Recover(); + await session.RecoverAsync(); messageSeen++; } else @@ -906,7 +906,7 @@ namespace NMS.AMQP.Test.Integration.Async stateMatcher: state => Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code )); - message.Acknowledge(); + await message.AcknowledgeAsync(); if (expectedIndex == messageCount) { @@ -958,10 +958,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, ct) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), ct); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout."); @@ -1001,10 +1001,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, _) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout."); @@ -1044,10 +1044,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, _) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout."); diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs index 87659da..27ae8e1 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs @@ -318,7 +318,7 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsReleasedAndSettled(); var consumer = await context.CreateConsumerAsync(queue); - consumer.Listener += message => throw new Exception(); + consumer.AsyncListener += (message, ct) => throw new Exception(); testPeer.WaitForAllMatchersToComplete(2000); @@ -483,7 +483,11 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); } - consumer.Listener += message => latch.Signal(); + consumer.AsyncListener += (message, ct) => + { + latch.Signal(); + return Task.CompletedTask; + }; Assert.True(latch.Wait(4000), "Messages not received within given timeout. Count remaining: " + latch.CurrentCount); @@ -517,7 +521,7 @@ namespace NMS.AMQP.Test.Integration.Async var consumer = await context.CreateConsumerAsync(destination); - consumer.Listener += message => { }; + consumer.AsyncListener += (message, ct) => Task.CompletedTask; Assert.CatchAsync<NMSException>(async () => await consumer.ReceiveAsync(), "Should have thrown an exception."); Assert.CatchAsync<NMSException>(async () => await consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000)), "Should have thrown an exception."); @@ -555,11 +559,11 @@ namespace NMS.AMQP.Test.Integration.Async var consumer = await context.CreateConsumerAsync(destination); - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { - var producer = context.CreateProducer(); - producer.Send(outbound, message); - producer.Close(); + var producer = await context.CreateProducerAsync(); + await producer.SendAsync(outbound, message); + await producer.CloseAsync(); }; testPeer.WaitForAllMatchersToComplete(10_000); @@ -594,11 +598,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - context.Close(); + await context.CloseAsync(); } catch (Exception e) { @@ -647,11 +651,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - context.Stop(); + await context.StopAsync(); } catch (Exception e) { @@ -700,11 +704,11 @@ namespace NMS.AMQP.Test.Integration.Async ManualResetEvent latch = new ManualResetEvent(false); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - context.Close(); + await context.CloseAsync(); } catch (Exception e) { @@ -759,11 +763,11 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); Exception exception = null; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { try { - consumer.Close(); + await consumer.CloseAsync(); latch.Set(); } catch (Exception e) @@ -824,7 +828,7 @@ namespace NMS.AMQP.Test.Integration.Async bool complete = false; int messageSeen = 0; int expectedIndex = 0; - consumer.Listener += message => + consumer.AsyncListener += async (message, ct) => { if (complete) { @@ -839,7 +843,7 @@ namespace NMS.AMQP.Test.Integration.Async // don't ack the message until we receive it X times if (messageSeen < recoverCount) { - context.Recover(); + await context.RecoverAsync(); messageSeen++; } else @@ -852,7 +856,7 @@ namespace NMS.AMQP.Test.Integration.Async stateMatcher: state => Assert.AreEqual(state.Descriptor.Code, MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code )); - message.Acknowledge(); + await message.AcknowledgeAsync(); if (expectedIndex == messageCount) { @@ -904,10 +908,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, ct) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout."); @@ -947,10 +951,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, ct) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout."); @@ -988,10 +992,10 @@ namespace NMS.AMQP.Test.Integration.Async testPeer.ExpectDispositionThatIsAcceptedAndSettled(); - consumer.Listener += _ => + consumer.AsyncListener += async (msg, ct) => { latch.Set(); - Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult(); + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); }; Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), "Messages not received within given timeout.");
