This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new b9d8fa9 Better testing of producer access modes and states b9d8fa9 is described below commit b9d8fa9f854c66aa964470af42edcd261cf64541 Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Wed Oct 11 14:38:29 2023 +0200 Better testing of producer access modes and states --- src/DotPulsar/DotPulsar.csproj | 2 +- tests/DotPulsar.Tests/IntegrationFixture.cs | 26 ++++ tests/DotPulsar.Tests/Internal/ProducerTests.cs | 162 +++++++++------------ .../DotPulsar.Tests/Internal/StateManagerTests.cs | 83 ++++++----- 4 files changed, 140 insertions(+), 133 deletions(-) diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index cfbde8c..a0e81e9 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -23,7 +23,7 @@ <ItemGroup> <PackageReference Include="HashDepot" Version="2.0.3" /> - <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="7.0.11" /> + <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="7.0.12" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" /> <PackageReference Include="protobuf-net" Version="3.2.26" /> <PackageReference Include="System.IO.Pipelines" Version="7.0.0" /> diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs b/tests/DotPulsar.Tests/IntegrationFixture.cs index e17c26b..b77a399 100644 --- a/tests/DotPulsar.Tests/IntegrationFixture.cs +++ b/tests/DotPulsar.Tests/IntegrationFixture.cs @@ -108,6 +108,32 @@ public class IntegrationFixture : IAsyncLifetime return result.Data[0]; } + private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; + + public string CreateTopic() + { + var topic = CreateTopicName(); + CreateTopic(topic); + return topic; + } + + public void CreateTopic(string topic) + { + var arguments = $"bin/pulsar-admin topics create {topic}"; + + var result = _cluster.Execute(arguments); + + if (!result.Success) + throw new Exception($"Could not create the topic: {result.Error}"); + } + + public string CreatePartitionedTopic(int numberOfPartitions) + { + var topic = CreateTopicName(); + CreatePartitionedTopic(topic, numberOfPartitions); + return topic; + } + public void CreatePartitionedTopic(string topic, int numberOfPartitions) { var arguments = $"bin/pulsar-admin topics create-partitioned-topic {topic} -p {numberOfPartitions}"; diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs b/tests/DotPulsar.Tests/Internal/ProducerTests.cs index b238409..cc33a72 100644 --- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs +++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs @@ -38,130 +38,106 @@ public class ProducerTests } [Fact] - public async Task SimpleProduceConsume_WhenSendingMessagesToProducer_ThenReceiveMessagesFromConsumer() + public async Task Send_GivenMessageWasSent_ShouldBeConsumable() { //Arrange const string content = "test-message"; - var topicName = CreateTopicName(); + var topicName = _fixture.CreateTopic(); await using var client = CreateClient(); await using var producer = CreateProducer(client, topicName); await using var consumer = CreateConsumer(client, topicName); //Act - await producer.Send(content); + var messageId = await producer.Send(content); var message = await consumer.Receive(); //Assert + message.MessageId.Should().Be(messageId); message.Value().Should().Be(content); } [Fact] - public async Task SimpleProduceConsume_WhenSendingWithChannel_ThenReceiveMessagesFromConsumer() + public async Task SendChannel_GivenMessageWasSent_ShouldBeConsumable() { //Arrange const string content = "test-message"; - var topicName = CreateTopicName(); + var topicName = _fixture.CreateTopic(); await using var client = CreateClient(); await using var producer = CreateProducer(client, topicName); await using var consumer = CreateConsumer(client, topicName); - const int msgCount = 3; + var messageId = MessageId.Earliest; - //Act - for (var i = 0; i < msgCount; i++) + ValueTask SetMessageId(MessageId id) { - await producer.SendChannel.Send(content); - _testOutputHelper.WriteLine($"Sent a message: {content}"); + messageId = id; + return ValueTask.CompletedTask; } + //Act + await producer.SendChannel.Send(content, SetMessageId); producer.SendChannel.Complete(); await producer.SendChannel.Completion(); + var message = await consumer.Receive(); //Assert - for (ulong i = 0; i < msgCount; i++) - { - var received = await consumer.Receive(); - received.SequenceId.Should().Be(i); - received.Value().Should().Be(content); - } + message.MessageId.Should().Be(messageId); + message.Value().Should().Be(content); } [Theory] - [InlineData(ProducerAccessMode.Shared, ProducerState.Connected)] - [InlineData(ProducerAccessMode.Exclusive, ProducerState.Fenced)] - [InlineData(ProducerAccessMode.WaitForExclusive, ProducerState.WaitingForExclusive)] - public async Task TwoProducers_WhenConnectingSecond_ThenGoToExpectedState(ProducerAccessMode accessMode, ProducerState expectedState) - { - //Arrange - var topicName = $"producer-access-mode{Guid.NewGuid():N}"; - var cts = new CancellationTokenSource(TestTimeout); - await using var client = CreateClient(); - - //Act - await using var producer1 = CreateProducer(client, topicName, accessMode); - _ = await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token); - - await using var producer2 = CreateProducer(client, topicName, accessMode); - var actualState = await producer2.OnStateChangeTo(expectedState, cts.Token); - - //Assert - actualState.Should().Be(expectedState); - } - - [Fact] - public async Task TwoProducers_WhenUsingExclusiveWithFencing_ThenExcludeExisting() + [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.Exclusive, ProducerState.Connected, ProducerState.Fenced)] + [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.ExclusiveWithFencing, ProducerState.Fenced, ProducerState.Connected)] + [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] + [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.Shared, ProducerState.Connected, ProducerState.Disconnected)] // Rethrow on ProducerBusy to Fault instead of just Disconnect + [InlineData(ProducerAccessMode.ExclusiveWithFencing, ProducerAccessMode.Exclusive, ProducerState.Connected, ProducerState.Fenced)] + [InlineData(ProducerAccessMode.ExclusiveWithFencing, ProducerAccessMode.ExclusiveWithFencing, ProducerState.Fenced, ProducerState.Connected)] + [InlineData(ProducerAccessMode.ExclusiveWithFencing, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] + [InlineData(ProducerAccessMode.ExclusiveWithFencing, ProducerAccessMode.Shared, ProducerState.Connected, ProducerState.Disconnected)] + [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive, ProducerState.Connected, ProducerState.Fenced)] + [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.ExclusiveWithFencing, ProducerState.Disconnected, ProducerState.Connected)] // Rethrow on ProducerBusy to Fault instead of just Disconnect + [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Shared, ProducerState.Connected, ProducerState.Connected)] + [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] + [InlineData(ProducerAccessMode.WaitForExclusive, ProducerAccessMode.Exclusive, ProducerState.Connected, ProducerState.Fenced)] + [InlineData(ProducerAccessMode.WaitForExclusive, ProducerAccessMode.ExclusiveWithFencing, ProducerState.Fenced, ProducerState.Connected)] + [InlineData(ProducerAccessMode.WaitForExclusive, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] + [InlineData(ProducerAccessMode.WaitForExclusive, ProducerAccessMode.Shared, ProducerState.Connected, ProducerState.Disconnected)] // Rethrow on ProducerBusy to Fault instead of just Disconnect + public async Task State_GivenMultipleProducersWithDifferentAccessModes_ThenGoToTheExpectedStates( + ProducerAccessMode accessModeForProducer1, + ProducerAccessMode accessModeForProducer2, + ProducerState expectedStateForProducer1, + ProducerState expectedStateForProducer2) { //Arrange + var topicName = _fixture.CreateTopic(); + using var cts = new CancellationTokenSource(TestTimeout); await using var client = CreateClient(); - var topicName = CreateTopicName(); - var cts = new CancellationTokenSource(TestTimeout); - - await using var producer1 = CreateProducer(client, topicName, ProducerAccessMode.ExclusiveWithFencing); + await using var producer1 = CreateProducer(client, topicName, accessModeForProducer1); await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token); //Act - await using var producer2 = CreateProducer(client, topicName, ProducerAccessMode.ExclusiveWithFencing); - await producer2.OnStateChangeTo(ProducerState.Connected, cts.Token); + await using var producer2 = CreateProducer(client, topicName, accessModeForProducer2); - try + if (accessModeForProducer2 == ProducerAccessMode.ExclusiveWithFencing) // We need to send a message to trigger the state change { - // We need to send a message to trigger the disconnect - await producer1.Send(topicName, cts.Token); - } - catch - { - //Ignore - } + await producer2.OnStateChangeTo(ProducerState.Connected, cts.Token); - var result = await producer1.OnStateChangeTo(ProducerState.Fenced, cts.Token); - - //Assert - result.Should().Be(ProducerState.Fenced); - } - - [Theory] - [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.Shared, ProducerState.Connected, ProducerState.Disconnected)] - [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive, ProducerState.Connected, ProducerState.Fenced)] - [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] - [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.WaitForExclusive, ProducerState.Connected, ProducerState.WaitingForExclusive)] - public async Task TwoProducers_WhenUsingDifferentAccessModes_ThenGoToExpectedStates(ProducerAccessMode accessMode1, ProducerAccessMode accessMode2, ProducerState producerState1, ProducerState producerState2) - { - //Arrange - var topicName = CreateTopicName(); - var cts = new CancellationTokenSource(TestTimeout); - await using var client = CreateClient(); - - await using var producer1 = CreateProducer(client, topicName, accessMode1); - await producer1.OnStateChangeTo(ProducerState.Connected, cts.Token); + try + { + await producer1.Send("test", cts.Token); + } + catch + { + //Ignore - //Act - await using var producer2 = CreateProducer(client, topicName, accessMode2); + } + } - var result1 = await producer1.OnStateChangeTo(producerState1, cts.Token); - var result2 = await producer2.OnStateChangeTo(producerState2, cts.Token); + var actualStateForProducer1 = await producer1.OnStateChangeTo(expectedStateForProducer1, cts.Token); + var actualStateForProducer2 = await producer2.OnStateChangeTo(expectedStateForProducer2, cts.Token); //Assert - result1.Should().Be(producerState1); - result2.Should().Be(producerState2); + actualStateForProducer1.Should().Be(expectedStateForProducer1); + actualStateForProducer2.Should().Be(expectedStateForProducer2); } [Fact] @@ -171,8 +147,7 @@ public class ProducerTests const string content = "test-message"; const int partitions = 3; const int msgCount = 3; - var topicName = CreateTopicName(); - _fixture.CreatePartitionedTopic(topicName, partitions); + var topicName = _fixture.CreatePartitionedTopic(partitions); await using var client = CreateClient(); //Act @@ -214,14 +189,11 @@ public class ProducerTests public async Task RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder() { //Arrange - await using var client = CreateClient(); - - var topicName = CreateTopicName(); const string content = "test-message"; const int partitions = 3; var consumers = new List<IConsumer<string>>(); - - _fixture.CreatePartitionedTopic(topicName, partitions); + await using var client = CreateClient(); + var topicName = _fixture.CreatePartitionedTopic(partitions); //Act await using var producer = CreateProducer(client, topicName); @@ -245,7 +217,7 @@ public class ProducerTests { //Arrange const int numberOfMessages = 10; - var topicName = CreateTopicName(); + var topicName = _fixture.CreateTopic(); await using var client = CreateClient(); await using var consumer = CreateConsumer(client, topicName); @@ -279,10 +251,7 @@ public class ProducerTests //Arrange const int numberOfMessages = 10; const int partitions = 4; - var topicName = CreateTopicName(); - - _fixture.CreatePartitionedTopic(topicName, partitions); - + var topicName = _fixture.CreatePartitionedTopic(partitions); await using var client = CreateClient(); await using var consumer = CreateConsumer(client, topicName); await using var producer = CreateProducer(client, topicName); @@ -346,26 +315,25 @@ public class ProducerTests private void LogState(ProducerStateChanged stateChange) => _testOutputHelper.WriteLine($"The producer for topic '{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'"); - private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; private static string CreateConsumerName() => $"consumer-{Guid.NewGuid():N}"; private static string CreateSubscriptionName() => $"subscription-{Guid.NewGuid():N}"; private IProducer<string> CreateProducer( IPulsarClient pulsarClient, - string? topicName = null, + string topicName, ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared) - => pulsarClient.NewProducer(Schema.String) - .Topic(topicName is null ? CreateTopicName() : topicName) + => pulsarClient.NewProducer(Schema.String) + .Topic(topicName) .ProducerAccessMode(producerAccessMode) .StateChangedHandler(LogState) .Create(); - private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, string? topicName = null) + private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, string topicName) => pulsarClient.NewConsumer(Schema.String) .ConsumerName(CreateConsumerName()) .InitialPosition(SubscriptionInitialPosition.Earliest) .SubscriptionName(CreateSubscriptionName()) - .Topic(topicName is null ? CreateTopicName() : topicName) + .Topic(topicName) .StateChangedHandler(LogState) .Create(); diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs index b1559c6..93bf887 100644 --- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs +++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs @@ -36,10 +36,10 @@ public class StateManagerTests public void SetState_GivenNewState_ShouldReturnFormerState(ProducerState initialState, ProducerState newState, ProducerState expected) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); //Act - var actual = sut.SetState(newState); + var actual = uut.SetState(newState); //Assert actual.Should().Be(expected); @@ -52,13 +52,13 @@ public class StateManagerTests public void SetState_GivenStateIsFinal_ShouldNotChangeState(ProducerState newState) { //Arrange - var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); + var uut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); //Act - _ = sut.SetState(newState); + _ = uut.SetState(newState); //Assert - sut.CurrentState.Should().Be(ProducerState.Closed); + uut.CurrentState.Should().Be(ProducerState.Closed); } [Theory] @@ -69,11 +69,11 @@ public class StateManagerTests public void SetState_GivenStateIsChangedToWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); - var task = sut.StateChangedTo(newState, default); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var task = uut.StateChangedTo(newState, default); //Act - _ = sut.SetState(newState); + _ = uut.SetState(newState); //Assert task.IsCompleted.Should().BeTrue(); @@ -87,11 +87,11 @@ public class StateManagerTests public void SetState_GivenStateIsChangedFromWanted_ShouldCompleteTask(ProducerState initialState, ProducerState newState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); - var task = sut.StateChangedFrom(initialState, default); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var task = uut.StateChangedFrom(initialState, default); //Act - _ = sut.SetState(newState); + _ = uut.SetState(newState); //Assert task.IsCompleted.Should().BeTrue(); @@ -104,10 +104,10 @@ public class StateManagerTests public void StateChangedTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState state) { //Arrange - var sut = new StateManager<ProducerState>(state, ProducerState.Closed); + var uut = new StateManager<ProducerState>(state, ProducerState.Closed); //Act - var task = sut.StateChangedTo(state, default); + var task = uut.StateChangedTo(state, default); //Assert Assert.True(task.IsCompleted); @@ -121,10 +121,10 @@ public class StateManagerTests public void StateChangedTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState wantedState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); //Act - var task = sut.StateChangedTo(wantedState, default); + var task = uut.StateChangedTo(wantedState, default); //Assert task.IsCompleted.Should().BeFalse(); @@ -136,10 +136,10 @@ public class StateManagerTests public void StateChangedTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state) { //Arrange - var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); + var uut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); //Act - var task = sut.StateChangedTo(state, default); + var task = uut.StateChangedTo(state, default); //Assert task.IsCompleted.Should().BeTrue(); @@ -151,10 +151,10 @@ public class StateManagerTests public void StateChangedFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState state) { //Arrange - var sut = new StateManager<ProducerState>(state, ProducerState.Closed); + var uut = new StateManager<ProducerState>(state, ProducerState.Closed); //Act - var task = sut.StateChangedFrom(state, default); + var task = uut.StateChangedFrom(state, default); //Assert task.IsCompleted.Should().BeFalse(); @@ -168,10 +168,10 @@ public class StateManagerTests public void StateChangedFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState initialState, ProducerState fromState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); //Act - var task = sut.StateChangedFrom(fromState, default); + var task = uut.StateChangedFrom(fromState, default); //Assert task.IsCompleted.Should().BeTrue(); @@ -184,10 +184,10 @@ public class StateManagerTests public void StateChangedFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state) { //Arrange - var sut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); + var uut = new StateManager<ProducerState>(ProducerState.Closed, ProducerState.Closed); //Act - var task = sut.StateChangedFrom(state, default); + var task = uut.StateChangedFrom(state, default); //Assert task.IsCompleted.Should().BeTrue(); @@ -199,11 +199,11 @@ public class StateManagerTests public void SetState_GivenStateIsChangeToFinalState_ShouldCompleteTask(ProducerState initialState, ProducerState wantedState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); //Act - var task = sut.StateChangedTo(wantedState, default); - _ = sut.SetState(ProducerState.Closed); + var task = uut.StateChangedTo(wantedState, default); + _ = uut.SetState(ProducerState.Closed); //Assert task.IsCompleted.Should().BeTrue(); @@ -215,32 +215,45 @@ public class StateManagerTests public void SetState_GivenStateIsChangedToNotWanted_ShouldNotCompleteTask(ProducerState initialState, ProducerState newState, ProducerState wantedState) { //Arrange - var sut = new StateManager<ProducerState>(initialState, ProducerState.Closed); + var uut = new StateManager<ProducerState>(initialState, ProducerState.Closed); //Act - var task = sut.StateChangedTo(wantedState, default); - _ = sut.SetState(newState); + var task = uut.StateChangedTo(wantedState, default); + _ = uut.SetState(newState); //Assert task.IsCompleted.Should().BeFalse(); } [Fact] - public async Task CancelToken_GivenTaskWasStillWaiting_ShouldCancelTask() + public async Task Cancellation_WhenTokenIsCanceledWhileWaiting_ShouldThrowException() { //Arrange - var sut = new StateManager<ProducerState>(ProducerState.Connected, ProducerState.Closed); - var cts = new CancellationTokenSource(); - var task = sut.StateChangedFrom(ProducerState.Connected, cts.Token); + var uut = new StateManager<ProducerState>(ProducerState.Connected, ProducerState.Closed); + using var cts = new CancellationTokenSource(); //Act + var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token); cts.Cancel(); var exception = await Record.ExceptionAsync(() => task.AsTask()); // xUnit can't record ValueTask yet //Assert exception.Should().BeOfType<TaskCanceledException>(); + } + + [Fact] + public async Task Cancellation_GivenCanceledToken_ShouldThrowException() + { + //Arrange + var uut = new StateManager<ProducerState>(ProducerState.Connected, ProducerState.Closed); + using var cts = new CancellationTokenSource(); - //Annihilate - cts.Dispose(); + //Act + cts.Cancel(); + var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token); + var exception = await Record.ExceptionAsync(() => task.AsTask()); // xUnit can't record ValueTask yet + + //Assert + exception.Should().BeOfType<TaskCanceledException>(); } }