This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bcd6e8  Fixed issue with producer not sending messages
6bcd6e8 is described below

commit 6bcd6e83ca102c15ec1a2be2340517dbb28bbb01
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Jan 16 14:05:51 2024 +0100

    Fixed issue with producer not sending messages
---
 src/DotPulsar/Internal/Producer.cs              | 47 ++++++++++------
 src/DotPulsar/Internal/SubProducer.cs           | 73 +++++++++++--------------
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj    |  2 +-
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 22 ++++----
 tests/DotPulsar.Tests/Internal/ProducerTests.cs | 14 ++---
 tests/DotPulsar.Tests/Internal/ReaderTests.cs   | 18 +++---
 6 files changed, 91 insertions(+), 85 deletions(-)

diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 723baff..c5fd667 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -215,7 +215,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage 
message, CancellationToken cancellationToken)
     {
         var tcs = new TaskCompletionSource<MessageId>();
-        var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled(cancellationToken));
+        using var registration = cancellationToken.Register(() => 
tcs.TrySetCanceled(cancellationToken));
 
         ValueTask OnMessageSent(MessageId messageId)
         {
@@ -227,21 +227,25 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 #endif
         }
 
-        try
-        {
-            await InternalSend(metadata, message, true, OnMessageSent, x => 
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
-            return await tcs.Task.ConfigureAwait(false);
-        }
-        finally
-        {
-            registration.Dispose();
-        }
+        await InternalSend(metadata, message, true, tcs, OnMessageSent, x => 
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
+        return await tcs.Task.ConfigureAwait(false);
     }
 
-    public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, 
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken 
cancellationToken = default)
-        => await InternalSend(metadata, message, false, onMessageSent, 
cancellationToken: cancellationToken).ConfigureAwait(false);
-
-    private async ValueTask InternalSend(MessageMetadata metadata, TMessage 
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent = 
default, Action<Exception>? onFailed = default, CancellationToken 
cancellationToken = default)
+    public async ValueTask Enqueue(
+        MessageMetadata metadata,
+        TMessage message,
+        Func<MessageId, ValueTask>? onMessageSent = default,
+        CancellationToken cancellationToken = default)
+        => await InternalSend(metadata, message, false, null, onMessageSent, 
cancellationToken: cancellationToken).ConfigureAwait(false);
+
+    private async ValueTask InternalSend(
+        MessageMetadata metadata,
+        TMessage message,
+        bool sendOpCancelable,
+        TaskCompletionSource<MessageId>? tcs = default,
+        Func<MessageId, ValueTask>? onMessageSent = default,
+        Action<Exception>? onFailed = default,
+        CancellationToken cancellationToken = default)
     {
         ThrowIfDisposed();
 
@@ -265,8 +269,9 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             var subProducer = _producers[partition];
             var data = _options.Schema.Encode(message);
 
-            var tcs = new TaskCompletionSource<MessageId>();
-            await subProducer.Send(new SendOp(metadata.Metadata, data, tcs, 
sendOpCancelable ? cancellationToken : CancellationToken.None), 
cancellationToken).ConfigureAwait(false);
+            tcs ??= new TaskCompletionSource<MessageId>();
+            var sendOp = new SendOp(metadata.Metadata, data, tcs, 
sendOpCancelable ? cancellationToken : CancellationToken.None);
+            await subProducer.Send(sendOp, 
cancellationToken).ConfigureAwait(false);
 
             _ = tcs.Task.ContinueWith(async task =>
             {
@@ -281,7 +286,15 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
                     if (autoAssignSequenceId)
                         metadata.SequenceId = 0;
 
-                    onFailed?.Invoke(exception);
+                    try
+                    {
+                        onFailed?.Invoke(exception);
+                    }
+                    catch
+                    {
+                        // Ignore
+                    }
+
                     return;
                 }
 
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 188d6b3..6924a4e 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -118,50 +118,45 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
 
     private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
     {
-        var responseQueue = new AsyncQueue<Task<BaseCommand>>();
-        var responseProcessorTask = ResponseProcessor(responseQueue, 
cancellationToken);
+        using var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+        var responseProcessorTask = Task.Run(async () => await 
ResponseProcessor(responseQueue, cancellationToken));
 
-        try
+        _sendQueue.ResetCursor();
+
+        while (!cancellationToken.IsCancellationRequested)
         {
-            while (!cancellationToken.IsCancellationRequested)
+            var sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+            if (sendOp.CancellationToken.IsCancellationRequested)
             {
-                var sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+                _sendQueue.RemoveCurrentItem();
+                continue;
+            }
 
-                if (sendOp.CancellationToken.IsCancellationRequested)
+            var tcs = new TaskCompletionSource<BaseCommand>();
+            _ = tcs.Task.ContinueWith(task =>
+            {
+                try
                 {
-                    _sendQueue.RemoveCurrentItem();
-                    continue;
+                    responseQueue.Enqueue(task);
                 }
-
-                var tcs = new TaskCompletionSource<BaseCommand>();
-                _ = tcs.Task.ContinueWith(task =>
+                catch
                 {
-                    try
-                    {
-                        responseQueue.Enqueue(task);
-                    }
-                    catch
-                    {
-                        // Ignore
-                    }
-                }, TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
-
-                // Use CancellationToken.None here because otherwise it will 
throw exceptions on all fault actions even retry.
-                var success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
-
-                if (success)
-                    continue;
+                    // Ignore
+                }
+            }, TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
+
+            // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
+            var success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
 
+            if (!success)
+            {
                 _eventRegister.Register(new 
ChannelDisconnected(_correlationId));
                 break;
             }
-
-            await responseProcessorTask.ConfigureAwait(false);
-        }
-        finally
-        {
-            responseQueue.Dispose();
         }
+
+        await responseProcessorTask.ConfigureAwait(false);
     }
 
     private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> 
responseQueue, CancellationToken cancellationToken)
@@ -210,9 +205,10 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
     {
         try
         {
-            if (_dispatcherCts is not null && 
!_dispatcherCts.IsCancellationRequested)
+            if (_dispatcherCts is not null)
             {
-                _dispatcherCts.Cancel();
+                if (!_dispatcherCts.IsCancellationRequested)
+                    _dispatcherCts.Cancel();
                 _dispatcherCts.Dispose();
             }
         }
@@ -236,15 +232,12 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
         _channel = await _executor.Execute(() => _factory.Create(_topicEpoch, 
cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    public async Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken)
+    public Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken)
     {
         _topicEpoch ??= topicEpoch;
         _dispatcherCts = new CancellationTokenSource();
-        await _executor.Execute(() =>
-        {
-            _sendQueue.ResetCursor();
-            _dispatcherTask = MessageDispatcher(_channel, 
_dispatcherCts.Token);
-        }, cancellationToken).ConfigureAwait(false);
+        _dispatcherTask = Task.Run(async () => await 
MessageDispatcher(_channel, _dispatcherCts.Token));
+        return Task.CompletedTask;
     }
 
     public async ValueTask CloseChannel(CancellationToken cancellationToken)
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj 
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 010d054..ec8627e 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -26,7 +26,7 @@
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
     <PackageReference Include="Testcontainers" Version="3.7.0" />
     <PackageReference Include="ToxiproxyNetCore" Version="1.0.35" />
-    <PackageReference Include="xunit" Version="2.6.5" />
+    <PackageReference Include="xunit" Version="2.6.6" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; 
buildtransitive</IncludeAssets>
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e5d51d1..911c147 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -96,7 +96,7 @@ public sealed class ConsumerTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
-        var expected = new List<MessageId>() { MessageId.Earliest };
+        var expected = new List<MessageId> { MessageId.Earliest };
 
         //Act
         var actual = await consumer.GetLastMessageIds(_cts.Token);
@@ -160,11 +160,11 @@ public sealed class ConsumerTests : IDisposable
 
         await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
 
-        var receiveTask = consumer.Receive(_cts.Token).AsTask();
+        var receiveTask = consumer.Receive(_cts.Token);
         semaphoreSlim.Release();
 
         //Act
-        var exception = await Record.ExceptionAsync(() => receiveTask);
+        var exception = await Record.ExceptionAsync(receiveTask.AsTask);
 
         //Assert
         exception.Should().BeOfType<ConsumerFaultedException>();
@@ -186,14 +186,14 @@ public sealed class ConsumerTests : IDisposable
         await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeOfType<ConsumerFaultedException>();
     }
 
-    [Fact(Skip = "Skip for now")]
-    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+    [Fact]
+    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
     {
         //Arrange
         var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -210,7 +210,7 @@ public sealed class ConsumerTests : IDisposable
         await ProduceMessages(producer, 1, "test-message", _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(async () => await 
receiveTask.AsTask());
+        var exception = await Record.ExceptionAsync(receiveTask.AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -230,7 +230,7 @@ public sealed class ConsumerTests : IDisposable
         //Act
         await connectionDown.DisposeAsync();
         await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -245,7 +245,7 @@ public sealed class ConsumerTests : IDisposable
         await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
consumer.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -262,7 +262,7 @@ public sealed class ConsumerTests : IDisposable
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
         await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
consumer.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -285,7 +285,7 @@ public sealed class ConsumerTests : IDisposable
             await consumer.StateChangedTo(ConsumerState.Disconnected, 
_cts.Token);
         }
         await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
consumer.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 7e0f8f4..39a8c89 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,8 +268,8 @@ public sealed class ProducerTests : IDisposable
         foundNonNegativeOne.Should().Be(true);
     }
 
-    [Fact(Skip = "Skip for now")]
-    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown()
+    [Fact]
+    public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown()
     {
         //Arrange
         var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -286,7 +286,7 @@ public sealed class ProducerTests : IDisposable
         await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(async () => await 
sendTask.AsTask());
+        var exception = await Record.ExceptionAsync(sendTask.AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -301,7 +301,7 @@ public sealed class ProducerTests : IDisposable
         var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
producer.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(producer.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -318,7 +318,7 @@ public sealed class ProducerTests : IDisposable
         //Act
         await connectionDown.DisposeAsync();
         await producer.StateChangedTo(ProducerState.Connected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
producer.Send("test", _cts.Token).AsTask());
+        var exception = await Record.ExceptionAsync(producer.Send("test", 
_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -335,7 +335,7 @@ public sealed class ProducerTests : IDisposable
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
         await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
producer.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(producer.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -355,7 +355,7 @@ public sealed class ProducerTests : IDisposable
             await producer.StateChangedTo(ProducerState.Disconnected, 
_cts.Token);
         }
         await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
producer.Send("test", _cts.Token).AsTask());
+        var exception = await Record.ExceptionAsync(producer.Send("test", 
_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 504b679..4b31294 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -39,7 +39,7 @@ public sealed class ReaderTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         await using var reader = CreateReader(client, MessageId.Earliest, 
await _fixture.CreateTopic(_cts.Token));
-        var expected = new List<MessageId>() { MessageId.Earliest };
+        var expected = new List<MessageId> { MessageId.Earliest };
 
         //Act
         var actual = await reader.GetLastMessageIds(_cts.Token);
@@ -206,14 +206,14 @@ public sealed class ReaderTests : IDisposable
         await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeOfType<ReaderFaultedException>();
     }
 
-    [Fact(Skip = "Skip for now")]
-    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+    [Fact]
+    public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
     {
         //Arrange
         var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -230,7 +230,7 @@ public sealed class ReaderTests : IDisposable
         await producer.Send("test-message", _cts.Token);
 
         //Act
-        var exception = await Record.ExceptionAsync(async () => await 
receiveTask.AsTask());
+        var exception = await Record.ExceptionAsync(receiveTask.AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -250,7 +250,7 @@ public sealed class ReaderTests : IDisposable
         //Act
         await connectionDown.DisposeAsync();
         await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -265,7 +265,7 @@ public sealed class ReaderTests : IDisposable
         var reader = CreateReader(client, MessageId.Earliest, await 
_fixture.CreateTopic(_cts.Token));
 
         //Act
-        var exception = await Record.ExceptionAsync(() => 
reader.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(reader.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -282,7 +282,7 @@ public sealed class ReaderTests : IDisposable
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
         await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
reader.DisposeAsync().AsTask());
+        var exception = await 
Record.ExceptionAsync(reader.DisposeAsync().AsTask);
 
         //Assert
         exception.Should().BeNull();
@@ -305,7 +305,7 @@ public sealed class ReaderTests : IDisposable
             await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
         }
         await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
-        var exception = await Record.ExceptionAsync(() => 
reader.Receive(_cts.Token).AsTask());
+        var exception = await 
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
 
         //Assert
         exception.Should().BeNull();

Reply via email to