This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fcab518  Fix object pooling issue (#130)
fcab518 is described below

commit fcab518100ef333cf655705c4ee18503dcc906fa
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jan 13 12:37:58 2023 +0100

    Fix object pooling issue (#130)
---
 src/DotPulsar/Extensions/SendChannelExtensions.cs | 12 +++++++-----
 tests/DotPulsar.Tests/ProducerTests.cs            | 15 ++++++++++++---
 2 files changed, 19 insertions(+), 8 deletions(-)

diff --git a/src/DotPulsar/Extensions/SendChannelExtensions.cs 
b/src/DotPulsar/Extensions/SendChannelExtensions.cs
index e4d4571..2149851 100644
--- a/src/DotPulsar/Extensions/SendChannelExtensions.cs
+++ b/src/DotPulsar/Extensions/SendChannelExtensions.cs
@@ -66,14 +66,16 @@ public static class SendChannelExtensions
     {
         var metadata = _messageMetadataPool.Get();
 
-        try
-        {
-            await sender.Send(metadata, message, onMessageSent, 
cancellationToken).ConfigureAwait(false);
-        }
-        finally
+        async ValueTask ReleaseMetadataAndCallCallback(MessageId id)
         {
             metadata.Metadata.Properties.Clear();
             _messageMetadataPool.Return(metadata);
+            if (onMessageSent != null)
+            {
+                await onMessageSent(id);
+            }
         }
+
+        await sender.Send(metadata, message, ReleaseMetadataAndCallCallback, 
cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs 
b/tests/DotPulsar.Tests/ProducerTests.cs
index 03656ba..8db8726 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -69,6 +69,7 @@ public class ProducerTests
         await using var client = CreateClient();
         string topicName = $"simple-produce-consume{Guid.NewGuid():N}";
         const string content = "test-message";
+        const int msgCount = 3;
 
         //Act
         await using var producer = client.NewProducer(Schema.String)
@@ -81,14 +82,22 @@ public class ProducerTests
             .InitialPosition(SubscriptionInitialPosition.Earliest)
             .Create();
 
-        await producer.SendChannel.Send(content);
-        _testOutputHelper.WriteLine($"Sent a message: {content}");
+        for (var i = 0; i < msgCount; i++)
+        {
+            await producer.SendChannel.Send(content);
+            _testOutputHelper.WriteLine($"Sent a message: {content}");
+        }
 
         producer.SendChannel.Complete();
         await producer.SendChannel.Completion();
 
         //Assert
-        (await consumer.Receive()).Value().Should().Be(content);
+        for (ulong i = 0; i < msgCount; i++)
+        {
+            var received = await consumer.Receive();
+            received.SequenceId.Should().Be(i);
+            received.Value().Should().Be(content);
+        }
     }
 
     [Fact]

Reply via email to