entvex commented on code in PR #267: URL: https://github.com/apache/pulsar-dotpulsar/pull/267#discussion_r2115361972
########## tests/DotPulsar.Tests/Internal/FlowTests.cs: ########## @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal; + +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using System.Buffers; +using System.Text.Json; +using Xunit.Abstractions; + +[Collection("Integration"), Trait("Category", "Integration")] +public sealed class FlowTests +{ + private readonly IntegrationFixture _fixture; + private readonly ITestOutputHelper _testOutputHelper; + + public FlowTests(IntegrationFixture fixture, ITestOutputHelper testOutputHelper) + { + _fixture = fixture; + _testOutputHelper = testOutputHelper; + } + + [Fact] + public async Task Should_Not_Increase_Permits_On_TryReceive() + { + //Arrange + var topicName = await _fixture.CreateTopic(CancellationToken.None); + var subscription = CreateSubscriptionName(); + var maxPrefetch = 2; + + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + using var httpClient = CreateAdminClient(); + await using var pulsarClient = CreateClient(); + await using var consumer = CreateConsumer(pulsarClient, topicName, subscription, (uint)maxPrefetch); + await using var producer = CreateProducer(pulsarClient, topicName); + + await consumer.StateChangedTo(ConsumerState.Active, cts.Token); + + // Wait until we get our first message + await producer.Send([1], cts.Token); + var message = await consumer.Receive(cts.Token); + await consumer.Acknowledge(message, cts.Token); + + //Act + var maxPermits = 0L; + for (int i = 0; i < maxPrefetch * 5; i++) + { + consumer.TryReceive(out _).ShouldBe(false); + await Task.Delay(50, cts.Token); + var permits = await GetPermits(httpClient, topicName, subscription, cts.Token); + maxPermits = Math.Max(maxPermits, permits); + } + + //Assert + Assert.True(maxPermits <= maxPrefetch, $"availablePermits increased above the threshold of {maxPrefetch} to {maxPermits}"); + } + + private static async ValueTask<long> GetPermits(HttpClient httpClient, string topic, string subscription, CancellationToken cancellationToken) Review Comment: Hi Shaun, Could you move this method into the IntegrationFixture.cs and the use _fixture to reach the method ? ########## tests/DotPulsar.Tests/IntegrationFixture.cs: ########## @@ -126,7 +133,7 @@ private void SubscribeToContainerEvents(IContainer container, string containerNa public async Task<string> CreateToken(TimeSpan expiryTime, CancellationToken cancellationToken) { - return await _pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, cancellationToken); + return (await _pulsarCluster.CreateAuthenticationTokenAsync(expiryTime, cancellationToken)).TrimEnd(); Review Comment: What issues did you face since you added a TrimEnd() here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org