This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fcab518 Fix object pooling issue (#130)
fcab518 is described below
commit fcab518100ef333cf655705c4ee18503dcc906fa
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jan 13 12:37:58 2023 +0100
Fix object pooling issue (#130)
---
src/DotPulsar/Extensions/SendChannelExtensions.cs | 12 +++++++-----
tests/DotPulsar.Tests/ProducerTests.cs | 15 ++++++++++++---
2 files changed, 19 insertions(+), 8 deletions(-)
diff --git a/src/DotPulsar/Extensions/SendChannelExtensions.cs
b/src/DotPulsar/Extensions/SendChannelExtensions.cs
index e4d4571..2149851 100644
--- a/src/DotPulsar/Extensions/SendChannelExtensions.cs
+++ b/src/DotPulsar/Extensions/SendChannelExtensions.cs
@@ -66,14 +66,16 @@ public static class SendChannelExtensions
{
var metadata = _messageMetadataPool.Get();
- try
- {
- await sender.Send(metadata, message, onMessageSent,
cancellationToken).ConfigureAwait(false);
- }
- finally
+ async ValueTask ReleaseMetadataAndCallCallback(MessageId id)
{
metadata.Metadata.Properties.Clear();
_messageMetadataPool.Return(metadata);
+ if (onMessageSent != null)
+ {
+ await onMessageSent(id);
+ }
}
+
+ await sender.Send(metadata, message, ReleaseMetadataAndCallCallback,
cancellationToken).ConfigureAwait(false);
}
}
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs
b/tests/DotPulsar.Tests/ProducerTests.cs
index 03656ba..8db8726 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -69,6 +69,7 @@ public class ProducerTests
await using var client = CreateClient();
string topicName = $"simple-produce-consume{Guid.NewGuid():N}";
const string content = "test-message";
+ const int msgCount = 3;
//Act
await using var producer = client.NewProducer(Schema.String)
@@ -81,14 +82,22 @@ public class ProducerTests
.InitialPosition(SubscriptionInitialPosition.Earliest)
.Create();
- await producer.SendChannel.Send(content);
- _testOutputHelper.WriteLine($"Sent a message: {content}");
+ for (var i = 0; i < msgCount; i++)
+ {
+ await producer.SendChannel.Send(content);
+ _testOutputHelper.WriteLine($"Sent a message: {content}");
+ }
producer.SendChannel.Complete();
await producer.SendChannel.Completion();
//Assert
- (await consumer.Receive()).Value().Should().Be(content);
+ for (ulong i = 0; i < msgCount; i++)
+ {
+ var received = await consumer.Receive();
+ received.SequenceId.Should().Be(i);
+ received.Value().Should().Be(content);
+ }
}
[Fact]