This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4632218 Consumer and Reader do not throw FaultedException as
excepted. (#176)
4632218 is described below
commit 4632218db733a19117c89884bd0445027d7f5589
Author: entvex <[email protected]>
AuthorDate: Wed Sep 13 15:00:12 2023 +0200
Consumer and Reader do not throw FaultedException as excepted. (#176)
The Consumer and Reader didn't throw FaultedException in two scenarios
where it should.
Consumer
1:
After the initial setup of the DotPulsar client where it can't connect to
the cluster. Creating a consumer and using Receive didn't produce a
ConsumerFaultedException as it should.
2:
After the initial setup of the DotPulsar client where it can't connect to
the cluster and is in a faulted state. Creating a consumer and using Receive
didn't produce a ConsumerFaultedException as it should.
Reader
1:
After the initial setup of the DotPulsar client where it can't connect to
the cluster. Creating a reader and using Receive didn't produce a
ReaderFaultedException as it should.
2:
After the initial setup of the DotPulsar client where it can't connect to
the cluster and is in a faulted state. Creating a reader and using Receive
didn't produce a ReaderFaultedException as it should.
All the scenarios are fixed, and tests are written in ConsumerTests.cs
ReaderTests.cs confirming the fix.
Co-authored-by: David Jensen <[email protected]>
---
CHANGELOG.md | 4 ++
src/DotPulsar/Internal/Consumer.cs | 8 +++-
src/DotPulsar/Internal/Reader.cs | 8 +++-
src/DotPulsar/Internal/SubConsumer.cs | 2 +-
src/DotPulsar/Internal/SubReader.cs | 2 +-
tests/DotPulsar.Tests/ConsumerTests.cs | 72 ++++++++++++++++++++++++++++++++++
tests/DotPulsar.Tests/ReaderTests.cs | 72 ++++++++++++++++++++++++++++++++++
7 files changed, 162 insertions(+), 6 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f77e0f8..2dd7a71 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a
MessageId without the topic field if
MessageId.Earliest is found.
+### Fixed
+
+- Fixed issue with DotPulsar client not handling connection faults for
consumers and readers.
+
## [3.0.0] - 2023-08-30
### Added
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index e0736ff..8211a5d 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
@@ -82,6 +83,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
{
try
{
+ await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
}
catch (Exception exception)
@@ -91,13 +93,12 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_faultException = exception;
_state.SetState(ConsumerState.Faulted);
+ _semaphoreSlim.Release();
}
}
private async Task Monitor()
{
- await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
-
_numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
_isPartitionedTopic = _numberOfPartitions != 0;
var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions :
1;
@@ -444,5 +445,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
_semaphoreSlim.Release();
}
+
+ if (_faultException is not null)
+ throw new ConsumerFaultedException(_faultException);
}
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 9bba145..ebc62ec 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
@@ -77,6 +78,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
{
try
{
+ await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
}
catch (Exception exception)
@@ -86,13 +88,12 @@ public sealed class Reader<TMessage> : IReader<TMessage>
_faultException = exception;
_state.SetState(ReaderState.Faulted);
+ _semaphoreSlim.Release();
}
}
private async Task Monitor()
{
- await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
-
_numberOfPartitions = Convert.ToInt32(await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
_isPartitionedTopic = _numberOfPartitions != 0;
var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
@@ -325,5 +326,8 @@ public sealed class Reader<TMessage> : IReader<TMessage>
await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
_semaphoreSlim.Release();
}
+
+ if (_faultException is not null)
+ throw new ReaderFaultedException(_faultException);
}
}
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
index 9b0de93..1c1d6dd 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -145,7 +145,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
throw new ConsumerDisposedException(GetType().FullName!);
if (_faultException is not null)
- throw new ConsumerFaultedException(_faultException);
+ throw _faultException;
}
public async Task EstablishNewChannel(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/SubReader.cs
b/src/DotPulsar/Internal/SubReader.cs
index b64facb..f0bfa31 100644
--- a/src/DotPulsar/Internal/SubReader.cs
+++ b/src/DotPulsar/Internal/SubReader.cs
@@ -148,7 +148,7 @@ public sealed class SubReader<TMessage> : IContainsChannel,
IReader<TMessage>
throw new ReaderDisposedException(GetType().FullName!);
if (_faultException is not null)
- throw new ReaderFaultedException(_faultException);
+ throw _faultException;
}
public async ValueTask ChannelFaulted(Exception exception)
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/ConsumerTests.cs
index 45f8a1e..4c63a51 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Tests;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using FluentAssertions;
using System;
@@ -202,6 +203,77 @@ public class ConsumerTests
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
+ {
+ //Arrange
+ var semaphoreSlim = new SemaphoreSlim(1);
+ await using var
+ client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
+ .Build();
+
+ await using var consumer = client.NewConsumer(Schema.String)
+ .StateChangedHandler(changed =>
+ {
+ var topic = changed.Consumer.Topic;
+ var state = changed.ConsumerState;
+ _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
+ })
+ .SubscriptionName("MySubscription")
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ var receiveTask = consumer.Receive().AsTask();
+ semaphoreSlim.Release();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() => receiveTask);
+
+ //Assert
+ exception.Should().BeOfType<ConsumerFaultedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
+ {
+ //Arrange
+ var cts = new CancellationTokenSource();
+
+ await using var
+ client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
+ .Build();
+
+ await using var consumer = client.NewConsumer(Schema.String)
+ .StateChangedHandler(changed =>
+ {
+ var topic = changed.Consumer.Topic;
+ var state = changed.ConsumerState;
+ _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
+ })
+ .SubscriptionName("MySubscription")
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ await consumer.OnStateChangeTo(ConsumerState.Faulted, cts.Token);
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.Receive().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<ConsumerFaultedException>();
+ }
+
private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<string> producer, int numberOfMessages,
CancellationToken ct, string content)
{
var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs
b/tests/DotPulsar.Tests/ReaderTests.cs
index 1b2eb9d..6a595d6 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Tests;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using FluentAssertions;
using System;
@@ -204,6 +205,77 @@ public class ReaderTests
actual.Should().BeEquivalentTo(expected);
}
+ [Fact]
+ public async Task
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
+ {
+ //Arrange
+ var semaphoreSlim = new SemaphoreSlim(1);
+ await using var
+ client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
+ .Build();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .StateChangedHandler(changed =>
+ {
+ var topic = changed.Reader.Topic;
+ var state = changed.ReaderState;
+ _testOutputHelper.WriteLine($"The consumer for topic '{topic}'
changed state to '{state}'");
+ })
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ var receiveTask = reader.Receive().AsTask();
+ semaphoreSlim.Release();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() => receiveTask);
+
+ //Assert
+ exception.Should().BeOfType<ReaderFaultedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
+ {
+ //Arrange
+ var cts = new CancellationTokenSource();
+
+ await using var
+ client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a
cluster that does not exists.
+ .Build();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .StateChangedHandler(changed =>
+ {
+ var topic = changed.Reader.Topic;
+ var state = changed.ReaderState;
+ _testOutputHelper.WriteLine($"The reader for topic '{topic}'
changed state to '{state}'");
+ })
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ await reader.OnStateChangeTo(ReaderState.Faulted, cts.Token);
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
reader.Receive().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<ReaderFaultedException>();
+ }
+
private IProducer<String> CreateProducer(IPulsarClient pulsarClient,
string topicName) => pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
.Create();