This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5e665f7 Fix issue with cancelling Receive method call (#267) 5e665f7 is described below commit 5e665f7e43bdcfffddf01907c6d6cda7468f32e0 Author: Shaun Becker <smbec...@gmail.com> AuthorDate: Wed Jun 4 03:54:53 2025 -0400 Fix issue with cancelling Receive method call (#267) * Fix issue with cancelling Receive method call Fixes #266 * Code review feedback * Code review feedback --- src/DotPulsar/Internal/ConsumerChannel.cs | 84 ++++++++++++----------- tests/DotPulsar.Tests/IntegrationFixture.cs | 13 +++- tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 91 ++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 42 deletions(-) diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index ddc5733..180195d 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -80,58 +80,66 @@ public sealed class ConsumerChannel<TMessage> : IConsumerChannel<TMessage> await SendFlow(cancellationToken).ConfigureAwait(false); _sendWhenZero--; - - var message = _batchHandler.GetNext(); - - if (message is not null) - return message; - - var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - - if (!messagePackage.ValidateMagicNumberAndChecksum()) + try { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch, cancellationToken).ConfigureAwait(false); - continue; - } + var message = _batchHandler.GetNext(); - var metadataSize = messagePackage.GetMetadataSize(); - var metadata = messagePackage.ExtractMetadata(metadataSize); - var data = messagePackage.ExtractData(metadataSize); + if (message is not null) + return message; - if (metadata.Compression != CompressionType.None) - { - var decompressor = _decompressors[(int) metadata.Compression]; - if (decompressor is null) - throw new CompressionException($"Support for {metadata.Compression} compression was not found"); + var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - try - { - data = decompressor.Decompress(data, (int) metadata.UncompressedSize); - } - catch + if (!messagePackage.ValidateMagicNumberAndChecksum()) { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false); + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch, cancellationToken).ConfigureAwait(false); continue; } - } - var messageId = messagePackage.MessageId; - var redeliveryCount = messagePackage.RedeliveryCount; + var metadataSize = messagePackage.GetMetadataSize(); + var metadata = messagePackage.ExtractMetadata(metadataSize); + var data = messagePackage.ExtractData(metadataSize); - if (metadata.ShouldSerializeNumMessagesInBatch()) - { - try + if (metadata.Compression != CompressionType.None) { - return _batchHandler.Add(messageId, redeliveryCount, metadata, data); + var decompressor = _decompressors[(int) metadata.Compression]; + if (decompressor is null) + throw new CompressionException($"Support for {metadata.Compression} compression was not found"); + + try + { + data = decompressor.Decompress(data, (int) metadata.UncompressedSize); + } + catch + { + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false); + continue; + } } - catch + + var messageId = messagePackage.MessageId; + var redeliveryCount = messagePackage.RedeliveryCount; + + if (metadata.ShouldSerializeNumMessagesInBatch()) { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false); - continue; + try + { + return _batchHandler.Add(messageId, redeliveryCount, metadata, data); + } + catch + { + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false); + continue; + } } - } - return _messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, metadata); + return _messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, metadata); + } + catch (Exception e) when (e is not CompressionException) + { + // Undo decrementing since we didn't actually receive anything + _sendWhenZero++; + throw; + } } } } diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs b/tests/DotPulsar.Tests/IntegrationFixture.cs index 784bb96..aafd4e0 100644 --- a/tests/DotPulsar.Tests/IntegrationFixture.cs +++ b/tests/DotPulsar.Tests/IntegrationFixture.cs @@ -18,6 +18,7 @@ using DotNet.Testcontainers.Builders; using DotNet.Testcontainers.Containers; using DotNet.Testcontainers.Networks; using DotPulsar.Abstractions; +using System.Net.Http.Headers; using System.Text; using System.Text.Json; using Testcontainers.Pulsar; @@ -64,13 +65,16 @@ public class IntegrationFixture : IAsyncLifetime .WithHostname("pulsar") .Build(); - ServiceUrl = new Uri($"pulsar://{_pulsarCluster.Hostname}:{PulsarPort}"); _toxiProxyConnection = new Connection(); _toxiProxyClient = _toxiProxyConnection.Client(); _toxiProxyPulsarProxy = new Proxy(); } - public Uri ServiceUrl { get; private set; } + public Uri ServiceUrl { get; private set; } = null!; // This will be set in `InitializeAsync` + + public Uri AdminUrl { get; private set; } = null!; // This will be set in `InitializeAsync` + + public AuthenticationHeaderValue AuthorizationHeader => new("Bearer", _token); public IAuthentication Authentication => AuthenticationFactory.Token(ct => ValueTask.FromResult(_token!)); @@ -93,6 +97,9 @@ public class IntegrationFixture : IAsyncLifetime await _pulsarCluster.StartAsync(_cts.Token); _messageSink.OnMessage(new DiagnosticMessage("The containers has initiated. Next, we'll configure Toxiproxy mappings.")); + ServiceUrl = new Uri(_pulsarCluster.GetBrokerAddress()); + AdminUrl = new Uri(_pulsarCluster.GetServiceAddress()); + _toxiProxyConnection = new Connection(_toxiProxy.Hostname, _toxiProxy.GetMappedPublicPort(ToxiProxyControlPort)); _toxiProxyClient = _toxiProxyConnection.Client(); _toxiProxyPulsarProxy = new Proxy @@ -126,7 +133,7 @@ public class IntegrationFixture : IAsyncLifetime public async Task<string> CreateToken(TimeSpan expiryTime, CancellationToken cancellationToken) { - return await _pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, cancellationToken); + return (await _pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, cancellationToken)).TrimEnd(); } private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs index 3f60b04..425a07b 100644 --- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs +++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs @@ -20,6 +20,7 @@ using DotPulsar.Exceptions; using DotPulsar.Extensions; using DotPulsar.Tests.Schemas.TestSamples.AvroModels; using System.Text; +using System.Text.Json; using System.Text.RegularExpressions; using Xunit.Abstractions; @@ -467,6 +468,40 @@ public sealed class ConsumerTests : IDisposable exception.ShouldBeNull(); } + [Fact] + public async Task TryReceive_WhenBufferIsEmpty_ShouldNotIncreasePermits() + { + //Arrange + var topicName = await _fixture.CreateTopic(_cts.Token); + var subscription = CreateSubscriptionName(); + var maxPrefetch = 2; + + using var httpClient = CreateAdminClient(); + await using var pulsarClient = CreateClient(); + await using var consumer = CreateConsumer(pulsarClient, topicName, subscription, Schema.ByteSequence, (uint)maxPrefetch); + await using var producer = CreateProducer(pulsarClient, topicName, Schema.ByteSequence); + + await consumer.StateChangedTo(ConsumerState.Active, _cts.Token); + + // Wait until we get our first message + await producer.Send([1], _cts.Token); + var message = await consumer.Receive(_cts.Token); + await consumer.Acknowledge(message, _cts.Token); + + //Act + var maxPermits = 0L; + for (int i = 0; i < maxPrefetch * 5; i++) + { + consumer.TryReceive(out _).ShouldBe(false); + await Task.Delay(50, _cts.Token); + var permits = await GetPermits(httpClient, topicName, subscription, _cts.Token); + maxPermits = Math.Max(maxPermits, permits); + } + + //Assert + Assert.True(maxPermits <= maxPrefetch, $"availablePermits increased above the threshold of {maxPrefetch} to {maxPermits}"); + } + private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<string> producer, int numberOfMessages, string content, CancellationToken ct) { var messageIds = new MessageId[numberOfMessages]; @@ -513,7 +548,6 @@ public sealed class ConsumerTests : IDisposable IPulsarClient pulsarClient, string topicName) => CreateProducer(pulsarClient, topicName, Schema.String); - private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, string topicName) => CreateConsumer(pulsarClient, topicName, Schema.String); @@ -541,6 +575,15 @@ public sealed class ConsumerTests : IDisposable .StateChangedHandler(_testOutputHelper.Log) .Create(); + private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string topicName, string subscription, ISchema<T> schema, uint maxPrefetch) + => pulsarClient.NewConsumer(schema) + .InitialPosition(SubscriptionInitialPosition.Earliest) + .SubscriptionName(subscription) + .Topic(topicName) + .StateChangedHandler(_testOutputHelper.Log) + .MessagePrefetchCount(maxPrefetch) + .Create(); + private IPulsarClient CreateClient() => PulsarClient .Builder() @@ -549,5 +592,51 @@ public sealed class ConsumerTests : IDisposable .ServiceUrl(_fixture.ServiceUrl) .Build(); + private HttpClient CreateAdminClient() => new() + { + BaseAddress = _fixture.AdminUrl, + DefaultRequestHeaders = + { + Authorization = _fixture.AuthorizationHeader + } + }; + + private static async ValueTask<long> GetPermits(HttpClient httpClient, string topic, string subscription, CancellationToken cancellationToken) + { + topic = topic.Replace("persistent://", string.Empty); + using var response = await httpClient.GetAsync($"/admin/v2/persistent/{topic}/stats", cancellationToken).ConfigureAwait(false); + if (response.IsSuccessStatusCode) + { + await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + var json = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken).ConfigureAwait(false); + if (!json.RootElement.TryGetProperty("subscriptions", out var subscriptionsProperty)) + { + return 0; + } + + if (!subscriptionsProperty.TryGetProperty(subscription, out var subscriptionProperty)) + { + return 0; + } + + if (subscriptionProperty.TryGetProperty("consumers", out var consumersProperty)) + { + foreach (var consumer in consumersProperty.EnumerateArray()) + { + if (consumer.TryGetProperty("availablePermits", out var permitsProperty)) + { + var permits = permitsProperty.GetInt64(); + if (permits > 0) + { + return permits; + } + } + } + } + } + + return 0; + } + public void Dispose() => _cts.Dispose(); }