This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push:
new 5e4e137 AMQNET-735 Add AsyncMessageListener to Consumer API
5e4e137 is described below
commit 5e4e137b071101463b8083f8a8a1cddada5b6f8a
Author: Havret <[email protected]>
AuthorDate: Sun Sep 10 18:43:40 2023 +0200
AMQNET-735 Add AsyncMessageListener to Consumer API
---
src/NMS.AMQP/Apache-NMS-AMQP.csproj | 2 +-
src/NMS.AMQP/NmsConsumer.cs | 6 +++
src/NMS.AMQP/NmsMessageConsumer.cs | 47 ++++++++++++++-----
src/NMS.AMQP/SessionDispatcher.cs | 2 +-
.../Async/ConsumerIntegrationTestAsync.cs | 50 ++++++++++-----------
.../Async/NMSConsumerIntegrationTestAsync.cs | 52 ++++++++++++----------
6 files changed, 98 insertions(+), 61 deletions(-)
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index dc14816..523295c 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -95,7 +95,7 @@ with the License. You may obtain a copy of the License at
<ItemGroup>
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
<PackageReference Include="AMQPNetLite.Core" Version="2.4.3" />
- <PackageReference Include="Apache.NMS" Version="2.0.0" />
+ <PackageReference Include="Apache.NMS" Version="2.1.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow"
Version="4.9.0" />
</ItemGroup>
</Project>
diff --git a/src/NMS.AMQP/NmsConsumer.cs b/src/NMS.AMQP/NmsConsumer.cs
index d6d905b..0ee265a 100644
--- a/src/NMS.AMQP/NmsConsumer.cs
+++ b/src/NMS.AMQP/NmsConsumer.cs
@@ -109,5 +109,11 @@ namespace Apache.NMS.AMQP
add => ((IMessageConsumer)consumer).Listener += value;
remove => ((IMessageConsumer)consumer).Listener -= value;
}
+
+ event AsyncMessageListener INMSConsumer.AsyncListener
+ {
+ add => ((IMessageConsumer)consumer).AsyncListener += value;
+ remove => ((IMessageConsumer)consumer).AsyncListener -= value;
+ }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 05261e6..d37b55a 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
@@ -110,6 +111,26 @@ namespace Apache.NMS.AMQP
public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
public string MessageSelector => Info.Selector;
+
+ event AsyncMessageListener IMessageConsumer.AsyncListener
+ {
+ add
+ {
+ CheckClosed();
+ using(syncRoot.Lock())
+ {
+ AsyncListener += value;
+ DrainMessageQueueToListener();
+ }
+ }
+ remove
+ {
+ using (syncRoot.LockAsync())
+ {
+ AsyncListener -= value;
+ }
+ }
+ }
event MessageListener IMessageConsumer.Listener
{
@@ -284,6 +305,8 @@ namespace Apache.NMS.AMQP
private event MessageListener Listener;
+ private event AsyncMessageListener AsyncListener;
+
public async Task Init()
{
await Session.Connection.CreateResource(Info).Await();
@@ -310,7 +333,7 @@ namespace Apache.NMS.AMQP
else
messageQueue.Enqueue(envelope);
- if (Session.IsStarted && Listener != null)
+ if (Session.IsStarted && HasMessageListener())
{
using (syncRoot.Exclude()) // Exclude lock for a time of
dispatching, so it does not pass along to actionblock
{
@@ -319,27 +342,27 @@ namespace Apache.NMS.AMQP
}
}
- private async Task DeliverNextPendingAsync()
+ private async Task DeliverNextPendingAsync(CancellationToken
cancellationToken)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} is about to deliver next pending
message.");
}
- if (Session.IsStarted && started && Listener != null)
+ if (Session.IsStarted && this.started && HasMessageListener())
{
using(await syncRoot.LockAsync().Await())
{
try
{
- if (started && Listener != null)
+ if (this.started && HasMessageListener())
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
{
if (Tracer.IsDebugEnabled)
{
- Tracer.Debug($"No message available for
delivery.");
+ Tracer.Debug("No message available for
delivery.");
}
return;
@@ -376,7 +399,11 @@ namespace Apache.NMS.AMQP
try
{
- Listener.Invoke(envelope.Message.Copy());
+ Listener?.Invoke(envelope.Message.Copy());
+ if (AsyncListener != null)
+ {
+ await
AsyncListener.Invoke(envelope.Message.Copy(), cancellationToken).Await();
+ }
}
catch (Exception)
{
@@ -592,7 +619,7 @@ namespace Apache.NMS.AMQP
public bool HasMessageListener()
{
- return Listener != null;
+ return Listener != null || AsyncListener != null;
}
public void Shutdown(Exception exception)
@@ -628,7 +655,7 @@ namespace Apache.NMS.AMQP
private void DrainMessageQueueToListener()
{
- if (Listener != null && Session.IsStarted)
+ if (HasMessageListener() && Session.IsStarted)
{
int size = messageQueue.Count;
for (int i = 0; i < size; i++)
@@ -715,9 +742,9 @@ namespace Apache.NMS.AMQP
this.consumer = consumer;
}
- public Task DeliverNextPending()
+ public Task DeliverNextPending(CancellationToken cancellationToken)
{
- return consumer.DeliverNextPendingAsync();
+ return consumer.DeliverNextPendingAsync(cancellationToken);
}
}
}
diff --git a/src/NMS.AMQP/SessionDispatcher.cs
b/src/NMS.AMQP/SessionDispatcher.cs
index 53a5da9..5f04891 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -49,7 +49,7 @@ namespace Apache.NMS.AMQP
try
{
isOnDispatcherFlow.Value = true;
- await messageDeliveryTask.DeliverNextPending().Await();
+ await
messageDeliveryTask.DeliverNextPending(cts.Token).Await();
}
finally
{
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
index 5445d5e..4ef6078 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
@@ -254,7 +254,7 @@ namespace NMS.AMQP.Test.Integration.Async
IQueue queue = await session.GetQueueAsync("myQueue");
IMessageConsumer consumer = await
session.CreateConsumerAsync(queue);
- consumer.Listener += message => { };
+ consumer.AsyncListener += (message, ct) => Task.CompletedTask;
// Verify the consumer gets marked closed
testPeer.WaitForAllMatchersToComplete(1000);
@@ -313,9 +313,9 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectReceiverAttach();
testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message() { BodySection = new AmqpValue() { Value = null } }, count: 1);
testPeer.ExpectDispositionThatIsReleasedAndSettled();
-
+
IMessageConsumer consumer = await
session.CreateConsumerAsync(queue);
- consumer.Listener += message => throw new Exception();
+ consumer.AsyncListener += (message, ct) => throw new
Exception();
testPeer.WaitForAllMatchersToComplete(2000);
@@ -606,14 +606,14 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
-
+
IMessageConsumer consumer = await
session.CreateConsumerAsync(destination);
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
- IMessageProducer producer =
session.CreateProducer(outbound);
- producer.Send(message);
- producer.Close();
+ IMessageProducer producer = await
session.CreateProducerAsync(outbound);
+ await producer.SendAsync(message);
+ await producer.CloseAsync();
};
testPeer.WaitForAllMatchersToComplete(10_000);
@@ -648,11 +648,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- connection.Close();
+ await connection.CloseAsync();
}
catch (Exception e)
{
@@ -701,11 +701,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- connection.Stop();
+ await connection.StopAsync();
}
catch (Exception e)
{
@@ -754,11 +754,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- session.Close();
+ await session.CloseAsync();
}
catch (Exception e)
{
@@ -813,11 +813,11 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- consumer.Close();
+ await consumer.CloseAsync();
latch.Set();
}
catch (Exception e)
@@ -878,7 +878,7 @@ namespace NMS.AMQP.Test.Integration.Async
bool complete = false;
int messageSeen = 0;
int expectedIndex = 0;
- consumer.Listener += message =>
+ consumer.AsyncListener += async(message, ct) =>
{
if (complete)
{
@@ -893,7 +893,7 @@ namespace NMS.AMQP.Test.Integration.Async
// don't ack the message until we receive it X times
if (messageSeen < recoverCount)
{
- session.Recover();
+ await session.RecoverAsync();
messageSeen++;
}
else
@@ -906,7 +906,7 @@ namespace NMS.AMQP.Test.Integration.Async
stateMatcher: state =>
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
));
- message.Acknowledge();
+ await message.AcknowledgeAsync();
if (expectedIndex == messageCount)
{
@@ -958,10 +958,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, ct) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");
@@ -1001,10 +1001,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, _) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100),
CancellationToken.None);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");
@@ -1044,10 +1044,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, _) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100),
CancellationToken.None);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
index 87659da..27ae8e1 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
@@ -318,7 +318,7 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsReleasedAndSettled();
var consumer = await context.CreateConsumerAsync(queue);
- consumer.Listener += message => throw new Exception();
+ consumer.AsyncListener += (message, ct) => throw new
Exception();
testPeer.WaitForAllMatchersToComplete(2000);
@@ -483,7 +483,11 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
}
- consumer.Listener += message => latch.Signal();
+ consumer.AsyncListener += (message, ct) =>
+ {
+ latch.Signal();
+ return Task.CompletedTask;
+ };
Assert.True(latch.Wait(4000), "Messages not received within
given timeout. Count remaining: " + latch.CurrentCount);
@@ -517,7 +521,7 @@ namespace NMS.AMQP.Test.Integration.Async
var consumer = await context.CreateConsumerAsync(destination);
- consumer.Listener += message => { };
+ consumer.AsyncListener += (message, ct) => Task.CompletedTask;
Assert.CatchAsync<NMSException>(async () => await
consumer.ReceiveAsync(), "Should have thrown an exception.");
Assert.CatchAsync<NMSException>(async () => await
consumer.ReceiveAsync(TimeSpan.FromMilliseconds(1000)), "Should have thrown an
exception.");
@@ -555,11 +559,11 @@ namespace NMS.AMQP.Test.Integration.Async
var consumer = await context.CreateConsumerAsync(destination);
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
- var producer = context.CreateProducer();
- producer.Send(outbound, message);
- producer.Close();
+ var producer = await context.CreateProducerAsync();
+ await producer.SendAsync(outbound, message);
+ await producer.CloseAsync();
};
testPeer.WaitForAllMatchersToComplete(10_000);
@@ -594,11 +598,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- context.Close();
+ await context.CloseAsync();
}
catch (Exception e)
{
@@ -647,11 +651,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- context.Stop();
+ await context.StopAsync();
}
catch (Exception e)
{
@@ -700,11 +704,11 @@ namespace NMS.AMQP.Test.Integration.Async
ManualResetEvent latch = new ManualResetEvent(false);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- context.Close();
+ await context.CloseAsync();
}
catch (Exception e)
{
@@ -759,11 +763,11 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
Exception exception = null;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
try
{
- consumer.Close();
+ await consumer.CloseAsync();
latch.Set();
}
catch (Exception e)
@@ -824,7 +828,7 @@ namespace NMS.AMQP.Test.Integration.Async
bool complete = false;
int messageSeen = 0;
int expectedIndex = 0;
- consumer.Listener += message =>
+ consumer.AsyncListener += async (message, ct) =>
{
if (complete)
{
@@ -839,7 +843,7 @@ namespace NMS.AMQP.Test.Integration.Async
// don't ack the message until we receive it X times
if (messageSeen < recoverCount)
{
- context.Recover();
+ await context.RecoverAsync();
messageSeen++;
}
else
@@ -852,7 +856,7 @@ namespace NMS.AMQP.Test.Integration.Async
stateMatcher: state =>
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code
));
- message.Acknowledge();
+ await message.AcknowledgeAsync();
if (expectedIndex == messageCount)
{
@@ -904,10 +908,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, ct) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100),
CancellationToken.None);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");
@@ -947,10 +951,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, ct) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100),
CancellationToken.None);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");
@@ -988,10 +992,10 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.ExpectDispositionThatIsAcceptedAndSettled();
- consumer.Listener += _ =>
+ consumer.AsyncListener += async (msg, ct) =>
{
latch.Set();
-
Task.Delay(TimeSpan.FromMilliseconds(100)).GetAwaiter().GetResult();
+ await Task.Delay(TimeSpan.FromMilliseconds(100),
CancellationToken.None);
};
Assert.True(latch.WaitOne(TimeSpan.FromMilliseconds(3000)),
"Messages not received within given timeout.");