This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e665f7  Fix issue with cancelling Receive method call (#267)
5e665f7 is described below

commit 5e665f7e43bdcfffddf01907c6d6cda7468f32e0
Author: Shaun Becker <smbec...@gmail.com>
AuthorDate: Wed Jun 4 03:54:53 2025 -0400

    Fix issue with cancelling Receive method call (#267)
    
    * Fix issue with cancelling Receive method call
    
    Fixes #266
    
    * Code review feedback
    
    * Code review feedback
---
 src/DotPulsar/Internal/ConsumerChannel.cs       | 84 ++++++++++++-----------
 tests/DotPulsar.Tests/IntegrationFixture.cs     | 13 +++-
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 91 ++++++++++++++++++++++++-
 3 files changed, 146 insertions(+), 42 deletions(-)

diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs 
b/src/DotPulsar/Internal/ConsumerChannel.cs
index ddc5733..180195d 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -80,58 +80,66 @@ public sealed class ConsumerChannel<TMessage> : 
IConsumerChannel<TMessage>
                     await SendFlow(cancellationToken).ConfigureAwait(false);
 
                 _sendWhenZero--;
-
-                var message = _batchHandler.GetNext();
-
-                if (message is not null)
-                    return message;
-
-                var messagePackage = await 
_queue.Dequeue(cancellationToken).ConfigureAwait(false);
-
-                if (!messagePackage.ValidateMagicNumberAndChecksum())
+                try
                 {
-                    await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.ChecksumMismatch, 
cancellationToken).ConfigureAwait(false);
-                    continue;
-                }
+                    var message = _batchHandler.GetNext();
 
-                var metadataSize = messagePackage.GetMetadataSize();
-                var metadata = messagePackage.ExtractMetadata(metadataSize);
-                var data = messagePackage.ExtractData(metadataSize);
+                    if (message is not null)
+                        return message;
 
-                if (metadata.Compression != CompressionType.None)
-                {
-                    var decompressor = _decompressors[(int) 
metadata.Compression];
-                    if (decompressor is null)
-                        throw new CompressionException($"Support for 
{metadata.Compression} compression was not found");
+                    var messagePackage = await 
_queue.Dequeue(cancellationToken).ConfigureAwait(false);
 
-                    try
-                    {
-                        data = decompressor.Decompress(data, (int) 
metadata.UncompressedSize);
-                    }
-                    catch
+                    if (!messagePackage.ValidateMagicNumberAndChecksum())
                     {
-                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.DecompressionError, 
cancellationToken).ConfigureAwait(false);
+                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.ChecksumMismatch, 
cancellationToken).ConfigureAwait(false);
                         continue;
                     }
-                }
 
-                var messageId = messagePackage.MessageId;
-                var redeliveryCount = messagePackage.RedeliveryCount;
+                    var metadataSize = messagePackage.GetMetadataSize();
+                    var metadata = 
messagePackage.ExtractMetadata(metadataSize);
+                    var data = messagePackage.ExtractData(metadataSize);
 
-                if (metadata.ShouldSerializeNumMessagesInBatch())
-                {
-                    try
+                    if (metadata.Compression != CompressionType.None)
                     {
-                        return _batchHandler.Add(messageId, redeliveryCount, 
metadata, data);
+                        var decompressor = _decompressors[(int) 
metadata.Compression];
+                        if (decompressor is null)
+                            throw new CompressionException($"Support for 
{metadata.Compression} compression was not found");
+
+                        try
+                        {
+                            data = decompressor.Decompress(data, (int) 
metadata.UncompressedSize);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.DecompressionError, 
cancellationToken).ConfigureAwait(false);
+                            continue;
+                        }
                     }
-                    catch
+
+                    var messageId = messagePackage.MessageId;
+                    var redeliveryCount = messagePackage.RedeliveryCount;
+
+                    if (metadata.ShouldSerializeNumMessagesInBatch())
                     {
-                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.BatchDeSerializeError, 
cancellationToken).ConfigureAwait(false);
-                        continue;
+                        try
+                        {
+                            return _batchHandler.Add(messageId, 
redeliveryCount, metadata, data);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.BatchDeSerializeError, 
cancellationToken).ConfigureAwait(false);
+                            continue;
+                        }
                     }
-                }
 
-                return _messageFactory.Create(messageId.ToMessageId(_topic), 
redeliveryCount, data, metadata);
+                    return 
_messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, 
metadata);
+                }
+                catch (Exception e) when (e is not CompressionException)
+                {
+                    // Undo decrementing since we didn't actually receive 
anything
+                    _sendWhenZero++;
+                    throw;
+                }
             }
         }
     }
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 784bb96..aafd4e0 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -18,6 +18,7 @@ using DotNet.Testcontainers.Builders;
 using DotNet.Testcontainers.Containers;
 using DotNet.Testcontainers.Networks;
 using DotPulsar.Abstractions;
+using System.Net.Http.Headers;
 using System.Text;
 using System.Text.Json;
 using Testcontainers.Pulsar;
@@ -64,13 +65,16 @@ public class IntegrationFixture : IAsyncLifetime
             .WithHostname("pulsar")
             .Build();
 
-        ServiceUrl = new 
Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}");
         _toxiProxyConnection = new Connection();
         _toxiProxyClient = _toxiProxyConnection.Client();
         _toxiProxyPulsarProxy = new Proxy();
     }
 
-    public Uri ServiceUrl { get; private set; }
+    public Uri ServiceUrl { get; private set; } = null!; // This will be set 
in `InitializeAsync`
+
+    public Uri AdminUrl { get; private set; } = null!; // This will be set in 
`InitializeAsync`
+
+    public AuthenticationHeaderValue AuthorizationHeader => new("Bearer", 
_token);
 
     public IAuthentication Authentication => AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_token!));
 
@@ -93,6 +97,9 @@ public class IntegrationFixture : IAsyncLifetime
         await _pulsarCluster.StartAsync(_cts.Token);
         _messageSink.OnMessage(new DiagnosticMessage("The containers has 
initiated. Next, we'll configure Toxiproxy mappings."));
 
+        ServiceUrl = new Uri(_pulsarCluster.GetBrokerAddress());
+        AdminUrl = new Uri(_pulsarCluster.GetServiceAddress());
+
         _toxiProxyConnection = new Connection(_toxiProxy.Hostname, 
_toxiProxy.GetMappedPublicPort(ToxiProxyControlPort));
         _toxiProxyClient = _toxiProxyConnection.Client();
         _toxiProxyPulsarProxy = new Proxy
@@ -126,7 +133,7 @@ public class IntegrationFixture : IAsyncLifetime
 
     public async Task<string> CreateToken(TimeSpan expiryTime, 
CancellationToken cancellationToken)
     {
-        return await _pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, 
cancellationToken);
+        return (await 
_pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, 
cancellationToken)).TrimEnd();
     }
 
     private static string CreateTopicName() => 
$"persistent://public/default/{Guid.NewGuid():N}";
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 3f60b04..425a07b 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -20,6 +20,7 @@ using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
 using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
 using System.Text;
+using System.Text.Json;
 using System.Text.RegularExpressions;
 using Xunit.Abstractions;
 
@@ -467,6 +468,40 @@ public sealed class ConsumerTests : IDisposable
         exception.ShouldBeNull();
     }
 
+    [Fact]
+    public async Task TryReceive_WhenBufferIsEmpty_ShouldNotIncreasePermits()
+    {
+        //Arrange
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        var subscription = CreateSubscriptionName();
+        var maxPrefetch = 2;
+
+        using var httpClient = CreateAdminClient();
+        await using var pulsarClient = CreateClient();
+        await using var consumer = CreateConsumer(pulsarClient, topicName, 
subscription, Schema.ByteSequence, (uint)maxPrefetch);
+        await using var producer = CreateProducer(pulsarClient, topicName, 
Schema.ByteSequence);
+
+        await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
+
+        // Wait until we get our first message
+        await producer.Send([1], _cts.Token);
+        var message = await consumer.Receive(_cts.Token);
+        await consumer.Acknowledge(message, _cts.Token);
+
+        //Act
+        var maxPermits = 0L;
+        for (int i = 0; i < maxPrefetch * 5; i++)
+        {
+            consumer.TryReceive(out _).ShouldBe(false);
+            await Task.Delay(50, _cts.Token);
+            var permits = await GetPermits(httpClient, topicName, 
subscription, _cts.Token);
+            maxPermits = Math.Max(maxPermits, permits);
+        }
+
+        //Assert
+        Assert.True(maxPermits <= maxPrefetch, $"availablePermits increased 
above the threshold of {maxPrefetch} to {maxPermits}");
+    }
+
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<string> producer, int numberOfMessages, string 
content, CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];
@@ -513,7 +548,6 @@ public sealed class ConsumerTests : IDisposable
         IPulsarClient pulsarClient,
         string topicName) => CreateProducer(pulsarClient, topicName, 
Schema.String);
 
-
     private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string topicName)
         => CreateConsumer(pulsarClient, topicName, Schema.String);
 
@@ -541,6 +575,15 @@ public sealed class ConsumerTests : IDisposable
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
 
+    private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string 
topicName, string subscription, ISchema<T> schema, uint maxPrefetch)
+        => pulsarClient.NewConsumer(schema)
+       .InitialPosition(SubscriptionInitialPosition.Earliest)
+       .SubscriptionName(subscription)
+       .Topic(topicName)
+       .StateChangedHandler(_testOutputHelper.Log)
+       .MessagePrefetchCount(maxPrefetch)
+       .Create();
+
     private IPulsarClient CreateClient()
         => PulsarClient
         .Builder()
@@ -549,5 +592,51 @@ public sealed class ConsumerTests : IDisposable
         .ServiceUrl(_fixture.ServiceUrl)
         .Build();
 
+    private HttpClient CreateAdminClient() => new()
+    {
+        BaseAddress = _fixture.AdminUrl,
+        DefaultRequestHeaders =
+        {
+            Authorization = _fixture.AuthorizationHeader
+        }
+    };
+
+    private static async ValueTask<long> GetPermits(HttpClient httpClient, 
string topic, string subscription, CancellationToken cancellationToken)
+    {
+        topic = topic.Replace("persistent://", string.Empty);
+        using var response = await 
httpClient.GetAsync($"/admin/v2/persistent/{topic}/stats", 
cancellationToken).ConfigureAwait(false);
+        if (response.IsSuccessStatusCode)
+        {
+            await using var stream = await 
response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
+            var json = await JsonDocument.ParseAsync(stream, 
cancellationToken: cancellationToken).ConfigureAwait(false);
+            if (!json.RootElement.TryGetProperty("subscriptions", out var 
subscriptionsProperty))
+            {
+                return 0;
+            }
+
+            if (!subscriptionsProperty.TryGetProperty(subscription, out var 
subscriptionProperty))
+            {
+                return 0;
+            }
+
+            if (subscriptionProperty.TryGetProperty("consumers", out var 
consumersProperty))
+            {
+                foreach (var consumer in consumersProperty.EnumerateArray())
+                {
+                    if (consumer.TryGetProperty("availablePermits", out var 
permitsProperty))
+                    {
+                        var permits = permitsProperty.GetInt64();
+                        if (permits > 0)
+                        {
+                            return permits;
+                        }
+                    }
+                }
+            }
+        }
+
+        return 0;
+    }
+
     public void Dispose() => _cts.Dispose();
 }

Reply via email to