This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bcd6e8 Fixed issue with producer not sending messages
6bcd6e8 is described below
commit 6bcd6e83ca102c15ec1a2be2340517dbb28bbb01
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Jan 16 14:05:51 2024 +0100
Fixed issue with producer not sending messages
---
src/DotPulsar/Internal/Producer.cs | 47 ++++++++++------
src/DotPulsar/Internal/SubProducer.cs | 73 +++++++++++--------------
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +-
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 22 ++++----
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 14 ++---
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 18 +++---
6 files changed, 91 insertions(+), 85 deletions(-)
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 723baff..c5fd667 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -215,7 +215,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage
message, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<MessageId>();
- var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled(cancellationToken));
+ using var registration = cancellationToken.Register(() =>
tcs.TrySetCanceled(cancellationToken));
ValueTask OnMessageSent(MessageId messageId)
{
@@ -227,21 +227,25 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
#endif
}
- try
- {
- await InternalSend(metadata, message, true, OnMessageSent, x =>
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
- return await tcs.Task.ConfigureAwait(false);
- }
- finally
- {
- registration.Dispose();
- }
+ await InternalSend(metadata, message, true, tcs, OnMessageSent, x =>
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
+ return await tcs.Task.ConfigureAwait(false);
}
- public async ValueTask Enqueue(MessageMetadata metadata, TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken
cancellationToken = default)
- => await InternalSend(metadata, message, false, onMessageSent,
cancellationToken: cancellationToken).ConfigureAwait(false);
-
- private async ValueTask InternalSend(MessageMetadata metadata, TMessage
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent =
default, Action<Exception>? onFailed = default, CancellationToken
cancellationToken = default)
+ public async ValueTask Enqueue(
+ MessageMetadata metadata,
+ TMessage message,
+ Func<MessageId, ValueTask>? onMessageSent = default,
+ CancellationToken cancellationToken = default)
+ => await InternalSend(metadata, message, false, null, onMessageSent,
cancellationToken: cancellationToken).ConfigureAwait(false);
+
+ private async ValueTask InternalSend(
+ MessageMetadata metadata,
+ TMessage message,
+ bool sendOpCancelable,
+ TaskCompletionSource<MessageId>? tcs = default,
+ Func<MessageId, ValueTask>? onMessageSent = default,
+ Action<Exception>? onFailed = default,
+ CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
@@ -265,8 +269,9 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var subProducer = _producers[partition];
var data = _options.Schema.Encode(message);
- var tcs = new TaskCompletionSource<MessageId>();
- await subProducer.Send(new SendOp(metadata.Metadata, data, tcs,
sendOpCancelable ? cancellationToken : CancellationToken.None),
cancellationToken).ConfigureAwait(false);
+ tcs ??= new TaskCompletionSource<MessageId>();
+ var sendOp = new SendOp(metadata.Metadata, data, tcs,
sendOpCancelable ? cancellationToken : CancellationToken.None);
+ await subProducer.Send(sendOp,
cancellationToken).ConfigureAwait(false);
_ = tcs.Task.ContinueWith(async task =>
{
@@ -281,7 +286,15 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
if (autoAssignSequenceId)
metadata.SequenceId = 0;
- onFailed?.Invoke(exception);
+ try
+ {
+ onFailed?.Invoke(exception);
+ }
+ catch
+ {
+ // Ignore
+ }
+
return;
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index 188d6b3..6924a4e 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -118,50 +118,45 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
private async Task MessageDispatcher(IProducerChannel channel,
CancellationToken cancellationToken)
{
- var responseQueue = new AsyncQueue<Task<BaseCommand>>();
- var responseProcessorTask = ResponseProcessor(responseQueue,
cancellationToken);
+ using var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+ var responseProcessorTask = Task.Run(async () => await
ResponseProcessor(responseQueue, cancellationToken));
- try
+ _sendQueue.ResetCursor();
+
+ while (!cancellationToken.IsCancellationRequested)
{
- while (!cancellationToken.IsCancellationRequested)
+ var sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+ if (sendOp.CancellationToken.IsCancellationRequested)
{
- var sendOp = await
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+ _sendQueue.RemoveCurrentItem();
+ continue;
+ }
- if (sendOp.CancellationToken.IsCancellationRequested)
+ var tcs = new TaskCompletionSource<BaseCommand>();
+ _ = tcs.Task.ContinueWith(task =>
+ {
+ try
{
- _sendQueue.RemoveCurrentItem();
- continue;
+ responseQueue.Enqueue(task);
}
-
- var tcs = new TaskCompletionSource<BaseCommand>();
- _ = tcs.Task.ContinueWith(task =>
+ catch
{
- try
- {
- responseQueue.Enqueue(task);
- }
- catch
- {
- // Ignore
- }
- }, TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
-
- // Use CancellationToken.None here because otherwise it will
throw exceptions on all fault actions even retry.
- var success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
-
- if (success)
- continue;
+ // Ignore
+ }
+ }, TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
+
+ // Use CancellationToken.None here because otherwise it will throw
exceptions on all fault actions even retry.
+ var success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
+ if (!success)
+ {
_eventRegister.Register(new
ChannelDisconnected(_correlationId));
break;
}
-
- await responseProcessorTask.ConfigureAwait(false);
- }
- finally
- {
- responseQueue.Dispose();
}
+
+ await responseProcessorTask.ConfigureAwait(false);
}
private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>>
responseQueue, CancellationToken cancellationToken)
@@ -210,9 +205,10 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
{
try
{
- if (_dispatcherCts is not null &&
!_dispatcherCts.IsCancellationRequested)
+ if (_dispatcherCts is not null)
{
- _dispatcherCts.Cancel();
+ if (!_dispatcherCts.IsCancellationRequested)
+ _dispatcherCts.Cancel();
_dispatcherCts.Dispose();
}
}
@@ -236,15 +232,12 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
_channel = await _executor.Execute(() => _factory.Create(_topicEpoch,
cancellationToken), cancellationToken).ConfigureAwait(false);
}
- public async Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken)
+ public Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken)
{
_topicEpoch ??= topicEpoch;
_dispatcherCts = new CancellationTokenSource();
- await _executor.Execute(() =>
- {
- _sendQueue.ResetCursor();
- _dispatcherTask = MessageDispatcher(_channel,
_dispatcherCts.Token);
- }, cancellationToken).ConfigureAwait(false);
+ _dispatcherTask = Task.Run(async () => await
MessageDispatcher(_channel, _dispatcherCts.Token));
+ return Task.CompletedTask;
}
public async ValueTask CloseChannel(CancellationToken cancellationToken)
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index 010d054..ec8627e 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -26,7 +26,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="Testcontainers" Version="3.7.0" />
<PackageReference Include="ToxiproxyNetCore" Version="1.0.35" />
- <PackageReference Include="xunit" Version="2.6.5" />
+ <PackageReference Include="xunit" Version="2.6.6" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers;
buildtransitive</IncludeAssets>
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index e5d51d1..911c147 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -96,7 +96,7 @@ public sealed class ConsumerTests : IDisposable
//Arrange
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
- var expected = new List<MessageId>() { MessageId.Earliest };
+ var expected = new List<MessageId> { MessageId.Earliest };
//Act
var actual = await consumer.GetLastMessageIds(_cts.Token);
@@ -160,11 +160,11 @@ public sealed class ConsumerTests : IDisposable
await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
- var receiveTask = consumer.Receive(_cts.Token).AsTask();
+ var receiveTask = consumer.Receive(_cts.Token);
semaphoreSlim.Release();
//Act
- var exception = await Record.ExceptionAsync(() => receiveTask);
+ var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
@@ -186,14 +186,14 @@ public sealed class ConsumerTests : IDisposable
await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeOfType<ConsumerFaultedException>();
}
- [Fact(Skip = "Skip for now")]
- public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+ [Fact]
+ public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -210,7 +210,7 @@ public sealed class ConsumerTests : IDisposable
await ProduceMessages(producer, 1, "test-message", _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(async () => await
receiveTask.AsTask());
+ var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.Should().BeNull();
@@ -230,7 +230,7 @@ public sealed class ConsumerTests : IDisposable
//Act
await connectionDown.DisposeAsync();
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
@@ -245,7 +245,7 @@ public sealed class ConsumerTests : IDisposable
await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
//Act
- var exception = await Record.ExceptionAsync(() =>
consumer.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -262,7 +262,7 @@ public sealed class ConsumerTests : IDisposable
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
consumer.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(consumer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -285,7 +285,7 @@ public sealed class ConsumerTests : IDisposable
await consumer.StateChangedTo(ConsumerState.Disconnected,
_cts.Token);
}
await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
consumer.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 7e0f8f4..39a8c89 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -268,8 +268,8 @@ public sealed class ProducerTests : IDisposable
foundNonNegativeOne.Should().Be(true);
}
- [Fact(Skip = "Skip for now")]
- public async Task
Connectivity_WhenConnectionIsInitiallyUpAndComesDown_ShouldBeAbleToSendWhileDown()
+ [Fact]
+ public async Task
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -286,7 +286,7 @@ public sealed class ProducerTests : IDisposable
await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(async () => await
sendTask.AsTask());
+ var exception = await Record.ExceptionAsync(sendTask.AsTask);
//Assert
exception.Should().BeNull();
@@ -301,7 +301,7 @@ public sealed class ProducerTests : IDisposable
var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
//Act
- var exception = await Record.ExceptionAsync(() =>
producer.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(producer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -318,7 +318,7 @@ public sealed class ProducerTests : IDisposable
//Act
await connectionDown.DisposeAsync();
await producer.StateChangedTo(ProducerState.Connected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
producer.Send("test", _cts.Token).AsTask());
+ var exception = await Record.ExceptionAsync(producer.Send("test",
_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
@@ -335,7 +335,7 @@ public sealed class ProducerTests : IDisposable
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
await producer.StateChangedTo(ProducerState.Disconnected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
producer.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(producer.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -355,7 +355,7 @@ public sealed class ProducerTests : IDisposable
await producer.StateChangedTo(ProducerState.Disconnected,
_cts.Token);
}
await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
producer.Send("test", _cts.Token).AsTask());
+ var exception = await Record.ExceptionAsync(producer.Send("test",
_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 504b679..4b31294 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -39,7 +39,7 @@ public sealed class ReaderTests : IDisposable
//Arrange
await using var client = CreateClient();
await using var reader = CreateReader(client, MessageId.Earliest,
await _fixture.CreateTopic(_cts.Token));
- var expected = new List<MessageId>() { MessageId.Earliest };
+ var expected = new List<MessageId> { MessageId.Earliest };
//Act
var actual = await reader.GetLastMessageIds(_cts.Token);
@@ -206,14 +206,14 @@ public sealed class ReaderTests : IDisposable
await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeOfType<ReaderFaultedException>();
}
- [Fact(Skip = "Skip for now")]
- public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoDown_ShouldBeAbleToReceiveWhenUpAgain()
+ [Fact]
+ public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
@@ -230,7 +230,7 @@ public sealed class ReaderTests : IDisposable
await producer.Send("test-message", _cts.Token);
//Act
- var exception = await Record.ExceptionAsync(async () => await
receiveTask.AsTask());
+ var exception = await Record.ExceptionAsync(receiveTask.AsTask);
//Assert
exception.Should().BeNull();
@@ -250,7 +250,7 @@ public sealed class ReaderTests : IDisposable
//Act
await connectionDown.DisposeAsync();
await reader.StateChangedTo(ReaderState.Connected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();
@@ -265,7 +265,7 @@ public sealed class ReaderTests : IDisposable
var reader = CreateReader(client, MessageId.Earliest, await
_fixture.CreateTopic(_cts.Token));
//Act
- var exception = await Record.ExceptionAsync(() =>
reader.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(reader.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -282,7 +282,7 @@ public sealed class ReaderTests : IDisposable
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
reader.DisposeAsync().AsTask());
+ var exception = await
Record.ExceptionAsync(reader.DisposeAsync().AsTask);
//Assert
exception.Should().BeNull();
@@ -305,7 +305,7 @@ public sealed class ReaderTests : IDisposable
await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
}
await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
- var exception = await Record.ExceptionAsync(() =>
reader.Receive(_cts.Token).AsTask());
+ var exception = await
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
//Assert
exception.Should().BeNull();