This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e4e137  AMQNET-735 Add AsyncMessageListener to Consumer API
5e4e137 is described below

commit 5e4e137b071101463b8083f8a8a1cddada5b6f8a
Author: Havret <[email protected]>
AuthorDate: Sun Sep 10 18:43:40 2023 +0200

    AMQNET-735 Add AsyncMessageListener to Consumer API
---
 src/NMS.AMQP/Apache-NMS-AMQP.csproj                |  2 +-
 src/NMS.AMQP/NmsConsumer.cs                        |  6 +++
 src/NMS.AMQP/NmsMessageConsumer.cs                 | 47 ++++++++++++++-----
 src/NMS.AMQP/SessionDispatcher.cs                  |  2 +-
 .../Async/ConsumerIntegrationTestAsync.cs          | 50 ++++++++++-----------
 .../Async/NMSConsumerIntegrationTestAsync.cs       | 52 ++++++++++++----------
 6 files changed, 98 insertions(+), 61 deletions(-)

diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj 
b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index dc14816..523295c 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -95,7 +95,7 @@ with the License.  You may obtain a copy of the License at
     <ItemGroup>
         <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
         <PackageReference Include="AMQPNetLite.Core" Version="2.4.3" />
-        <PackageReference Include="Apache.NMS" Version="2.0.0" />
+        <PackageReference Include="Apache.NMS" Version="2.1.0" />
         <PackageReference Include="System.Threading.Tasks.Dataflow" 
Version="4.9.0" />
     </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/NmsConsumer.cs b/src/NMS.AMQP/NmsConsumer.cs
index d6d905b..0ee265a 100644
--- a/src/NMS.AMQP/NmsConsumer.cs
+++ b/src/NMS.AMQP/NmsConsumer.cs
@@ -109,5 +109,11 @@ namespace Apache.NMS.AMQP
             add => ((IMessageConsumer)consumer).Listener += value;
             remove => ((IMessageConsumer)consumer).Listener -= value;
         }
+
+        event AsyncMessageListener INMSConsumer.AsyncListener
+        {
+            add => ((IMessageConsumer)consumer).AsyncListener += value;
+            remove => ((IMessageConsumer)consumer).AsyncListener -= value;
+        }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs 
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 05261e6..d37b55a 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Threading;
 using System.Threading.Tasks;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
@@ -110,6 +111,26 @@ namespace Apache.NMS.AMQP
         public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
 
         public string MessageSelector => Info.Selector;
+        
+        event AsyncMessageListener IMessageConsumer.AsyncListener 
+        {
+            add
+            {
+                CheckClosed();
+                using(syncRoot.Lock())
+                {
+                    AsyncListener += value;
+                    DrainMessageQueueToListener();
+                }
+            }
+            remove
+            {
+                using (syncRoot.LockAsync())
+                {
+                    AsyncListener -= value;
+                }
+            }
+        }
 
         event MessageListener IMessageConsumer.Listener
         {
@@ -284,6 +305,8 @@ namespace Apache.NMS.AMQP
 
         private event MessageListener Listener;
 
+        private event AsyncMessageListener AsyncListener;
+
         public async Task Init()
         {
             await Session.Connection.CreateResource(Info).Await();
@@ -310,7 +333,7 @@ namespace Apache.NMS.AMQP
             else
                 messageQueue.Enqueue(envelope);
 
-            if (Session.IsStarted && Listener != null)
+            if (Session.IsStarted && HasMessageListener())
             {
                 using (syncRoot.Exclude()) // Exclude lock for a time of 
dispatching, so it does not pass along to actionblock
                 {
@@ -319,27 +342,27 @@ namespace Apache.NMS.AMQP
             }
         }
 
-        private async Task DeliverNextPendingAsync()
+        private async Task DeliverNextPendingAsync(CancellationToken 
cancellationToken)
         {
             if (Tracer.IsDebugEnabled)
             {
                 Tracer.Debug($"{Info.Id} is about to deliver next pending 
message.");
             }
             
-            if (Session.IsStarted && started && Listener != null)
+            if (Session.IsStarted && this.started && HasMessageListener())
             {
                 using(await syncRoot.LockAsync().Await())
                 {
                     try
                     {
-                        if (started && Listener != null)
+                        if (this.started && HasMessageListener())
                         {
                             var envelope = messageQueue.DequeueNoWait();
                             if (envelope == null)
                             {
                                 if (Tracer.IsDebugEnabled)
                                 {
-                                    Tracer.Debug($"No message available for 
delivery.");
+                                    Tracer.Debug("No message available for 
delivery.");
                                 }
 
                                 return;
@@ -376,7 +399,11 @@ namespace Apache.NMS.AMQP
 
                                 try
                                 {
-                                    Listener.Invoke(envelope.Message.Copy());
+                                    Listener?.Invoke(envelope.Message.Copy());
+                                    if (AsyncListener != null)
+                                    {
+                                        await 
AsyncListener.Invoke(envelope.Message.Copy(), cancellationToken).Await();
+                                    }
                                 }
                                 catch (Exception)
                                 {
@@ -592,7 +619,7 @@ namespace Apache.NMS.AMQP
 
         public bool HasMessageListener()
         {
-            return Listener != null;
+            return Listener != null || AsyncListener != null;
         }
 
         public void Shutdown(Exception exception)
@@ -628,7 +655,7 @@ namespace Apache.NMS.AMQP
 
         private void DrainMessageQueueToListener()
         {
-            if (Listener != null && Session.IsStarted)
+            if (HasMessageListener() && Session.IsStarted)
             {
                 int size = messageQueue.Count;
                 for (int i = 0; i < size; i++)
@@ -715,9 +742,9 @@ namespace Apache.NMS.AMQP
                 this.consumer = consumer;
             }
 
-            public Task DeliverNextPending()
+            public Task DeliverNextPending(CancellationToken cancellationToken)
             {
-                return consumer.DeliverNextPendingAsync();
+                return consumer.DeliverNextPendingAsync(cancellationToken);
             }
         }
     }
diff --git a/src/NMS.AMQP/SessionDispatcher.cs 
b/src/NMS.AMQP/SessionDispatcher.cs
index 53a5da9..5f04891 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -49,7 +49,7 @@ namespace Apache.NMS.AMQP
             try
             {
                 isOnDispatcherFlow.Value = true;
-                await messageDeliveryTask.DeliverNextPending().Await();
+                await 
messageDeliveryTask.DeliverNextPending(cts.Token).Await();
             }
             finally
             {
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs 
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
index 5445d5e..4ef6078 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
@@ -254,7 +254,7 @@ namespace NMS.AMQP.Test.Integration.Async
                 IQueue queue = await session.GetQueueAsync("myQueue");
                 IMessageConsumer consumer = await 
session.CreateConsumerAsync(queue);
 
-                consumer.Listener += message => { };
+                consumer.AsyncListener += (message, ct) => Task.CompletedTask;
 
                 // Verify the consumer gets marked closed
                 testPeer.WaitForAllMatchersToComplete(1000);
@@ -313,9 +313,9 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.ExpectReceiverAttach();
                 testPeer.ExpectLinkFlowRespondWithTransfer(message: new 
Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
                 testPeer.ExpectDispositionThatIsReleasedAndSettled();
-
+                
                 IMessageConsumer consumer = await 
session.CreateConsumerAsync(queue);
-                consumer.Listener += message => throw new Exception();
+                consumer.AsyncListener += (message, ct) => throw new 
Exception();
 
                 testPeer.WaitForAllMatchersToComplete(2000);
 
@@ -606,14 +606,14 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
-
+                
                 IMessageConsumer consumer = await 
session.CreateConsumerAsync(destination);
 
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
-                    IMessageProducer producer = 
session.CreateProducer(outbound);
-                    producer.Send(message);
-                    producer.Close();
+                    IMessageProducer producer = await 
session.CreateProducerAsync(outbound);
+                    await producer.SendAsync(message);
+                    await producer.CloseAsync();
                 };
 
                 testPeer.WaitForAllMatchersToComplete(10_000);
@@ -648,11 +648,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        connection.Close();
+                        await connection.CloseAsync();
                     }
                     catch (Exception e)
                     {
@@ -701,11 +701,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        connection.Stop();
+                        await connection.StopAsync();
                     }
                     catch (Exception e)
                     {
@@ -754,11 +754,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        session.Close();
+                        await session.CloseAsync();
                     }
                     catch (Exception e)
                     {
@@ -813,11 +813,11 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
 
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        consumer.Close();
+                        await consumer.CloseAsync();
                         latch.Set();
                     }
                     catch (Exception e)
@@ -878,7 +878,7 @@ namespace NMS.AMQP.Test.Integration.Async
                 bool complete = false;
                 int messageSeen = 0;
                 int expectedIndex = 0;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async(message, ct) =>
                 {
                     if (complete)
                     {
@@ -893,7 +893,7 @@ namespace NMS.AMQP.Test.Integration.Async
                         // don't ack the message until we receive it X times
                         if (messageSeen < recoverCount)
                         {
-                            session.Recover();
+                            await session.RecoverAsync();
                             messageSeen++;
                         }
                         else
@@ -906,7 +906,7 @@ namespace NMS.AMQP.Test.Integration.Async
                                 stateMatcher: state => 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
                                 ));
 
-                            message.Acknowledge();
+                            await message.AcknowledgeAsync();
 
                             if (expectedIndex == messageCount)
                             {
@@ -958,10 +958,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, ct) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");
@@ -1001,10 +1001,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, _) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), 
CancellationToken.None);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");
@@ -1044,10 +1044,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, _) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), 
CancellationToken.None);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
index 87659da..27ae8e1 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
@@ -318,7 +318,7 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.ExpectDispositionThatIsReleasedAndSettled();
 
                 var consumer = await context.CreateConsumerAsync(queue);
-                consumer.Listener += message => throw new Exception();
+                consumer.AsyncListener += (message, ct) => throw new 
Exception();
 
                 testPeer.WaitForAllMatchersToComplete(2000);
 
@@ -483,7 +483,11 @@ namespace NMS.AMQP.Test.Integration.Async
                     testPeer.ExpectDispositionThatIsAcceptedAndSettled();
                 }
 
-                consumer.Listener += message => latch.Signal();
+                consumer.AsyncListener += (message, ct) =>
+                {
+                    latch.Signal();
+                    return Task.CompletedTask;
+                };
 
                 Assert.True(latch.Wait(4000), "Messages not received within 
given timeout. Count remaining: " + latch.CurrentCount);
 
@@ -517,7 +521,7 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 var consumer = await context.CreateConsumerAsync(destination);
 
-                consumer.Listener += message => { };
+                consumer.AsyncListener += (message, ct) => Task.CompletedTask;
 
                 Assert.CatchAsync<NMSException>(async () => await 
consumer.ReceiveAsync(), "Should have thrown an exception.");
                 Assert.CatchAsync<NMSException>(async () => await 
consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000)), "Should have thrown an 
exception.");
@@ -555,11 +559,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 var consumer = await context.CreateConsumerAsync(destination);
 
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
-                    var producer = context.CreateProducer();
-                    producer.Send(outbound, message);
-                    producer.Close();
+                    var producer = await context.CreateProducerAsync();
+                    await producer.SendAsync(outbound, message);
+                    await producer.CloseAsync();
                 };
 
                 testPeer.WaitForAllMatchersToComplete(10_000);
@@ -594,11 +598,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        context.Close();
+                        await context.CloseAsync();
                     }
                     catch (Exception e)
                     {
@@ -647,11 +651,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        context.Stop();
+                        await context.StopAsync();
                     }
                     catch (Exception e)
                     {
@@ -700,11 +704,11 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 ManualResetEvent latch = new ManualResetEvent(false);
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        context.Close();
+                        await context.CloseAsync();
                     }
                     catch (Exception e)
                     {
@@ -759,11 +763,11 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
 
                 Exception exception = null;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     try
                     {
-                        consumer.Close();
+                        await consumer.CloseAsync();
                         latch.Set();
                     }
                     catch (Exception e)
@@ -824,7 +828,7 @@ namespace NMS.AMQP.Test.Integration.Async
                 bool complete = false;
                 int messageSeen = 0;
                 int expectedIndex = 0;
-                consumer.Listener += message =>
+                consumer.AsyncListener += async (message, ct) =>
                 {
                     if (complete)
                     {
@@ -839,7 +843,7 @@ namespace NMS.AMQP.Test.Integration.Async
                         // don't ack the message until we receive it X times
                         if (messageSeen < recoverCount)
                         {
-                            context.Recover();
+                            await context.RecoverAsync();
                             messageSeen++;
                         }
                         else
@@ -852,7 +856,7 @@ namespace NMS.AMQP.Test.Integration.Async
                                 stateMatcher: state => 
Assert.AreEqual(state.Descriptor.Code, 
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
                                 ));
 
-                            message.Acknowledge();
+                            await message.AcknowledgeAsync();
 
                             if (expectedIndex == messageCount)
                             {
@@ -904,10 +908,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, ct) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), 
CancellationToken.None);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");
@@ -947,10 +951,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, ct) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), 
CancellationToken.None);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");
@@ -988,10 +992,10 @@ namespace NMS.AMQP.Test.Integration.Async
 
                 testPeer.ExpectDispositionThatIsAcceptedAndSettled();
 
-                consumer.Listener += _ =>
+                consumer.AsyncListener += async (msg, ct) =>
                 {
                     latch.Set();
-                    
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+                    await Task.Delay(TimeSpan.FromMilliseconds(100), 
CancellationToken.None);
                 };
 
                 Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)), 
"Messages not received within given timeout.");

Reply via email to