This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4632218  Consumer and Reader do not throw FaultedException as 
excepted. (#176)
4632218 is described below

commit 4632218db733a19117c89884bd0445027d7f5589
Author: entvex <[email protected]>
AuthorDate: Wed Sep 13 15:00:12 2023 +0200

    Consumer and Reader do not throw FaultedException as excepted. (#176)
    
    The Consumer and Reader didn't throw FaultedException in two scenarios 
where it should.
    
    Consumer
    1:
    After the initial setup of the DotPulsar client where it can't connect to 
the cluster. Creating a consumer and using Receive didn't produce a 
ConsumerFaultedException as it should.
    
    2:
    After the initial setup of the DotPulsar client where it can't connect to 
the cluster and is in a faulted state. Creating a consumer and using Receive 
didn't produce a ConsumerFaultedException as it should.
    
    Reader
    1:
    After the initial setup of the DotPulsar client where it can't connect to 
the cluster. Creating a reader and using Receive didn't produce a 
ReaderFaultedException as it should.
    
    2:
    After the initial setup of the DotPulsar client where it can't connect to 
the cluster and is in a faulted state. Creating a reader and using Receive 
didn't produce a ReaderFaultedException as it should.
    
    All the scenarios are fixed, and tests are written in ConsumerTests.cs 
ReaderTests.cs confirming the fix.
    
    Co-authored-by: David Jensen <[email protected]>
---
 CHANGELOG.md                           |  4 ++
 src/DotPulsar/Internal/Consumer.cs     |  8 +++-
 src/DotPulsar/Internal/Reader.cs       |  8 +++-
 src/DotPulsar/Internal/SubConsumer.cs  |  2 +-
 src/DotPulsar/Internal/SubReader.cs    |  2 +-
 tests/DotPulsar.Tests/ConsumerTests.cs | 72 ++++++++++++++++++++++++++++++++++
 tests/DotPulsar.Tests/ReaderTests.cs   | 72 ++++++++++++++++++++++++++++++++++
 7 files changed, 162 insertions(+), 6 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f77e0f8..2dd7a71 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 - When calling GetLastMessageId(s) on a Reader or Consumer, it returns a 
MessageId without the topic field if
   MessageId.Earliest is found.
 
+### Fixed
+
+- Fixed issue with DotPulsar client not handling connection faults for 
consumers and readers.
+
 ## [3.0.0] - 2023-08-30
 
 ### Added
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index e0736ff..8211a5d 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
@@ -82,6 +83,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     {
         try
         {
+            await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
             await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
         }
         catch (Exception exception)
@@ -91,13 +93,12 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
             _faultException = exception;
             _state.SetState(ConsumerState.Faulted);
+            _semaphoreSlim.Release();
         }
     }
 
     private async Task Monitor()
     {
-        await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
-
         _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
         _isPartitionedTopic = _numberOfPartitions != 0;
         var numberOfSubConsumers = _isPartitionedTopic ? _numberOfPartitions : 
1;
@@ -444,5 +445,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             await 
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
             _semaphoreSlim.Release();
         }
+
+        if (_faultException is not null)
+            throw new ConsumerFaultedException(_faultException);
     }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 9bba145..ebc62ec 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
@@ -77,6 +78,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
     {
         try
         {
+            await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
             await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
         }
         catch (Exception exception)
@@ -86,13 +88,12 @@ public sealed class Reader<TMessage> : IReader<TMessage>
 
             _faultException = exception;
             _state.SetState(ReaderState.Faulted);
+            _semaphoreSlim.Release();
         }
     }
 
     private async Task Monitor()
     {
-        await _semaphoreSlim.WaitAsync(_cts.Token).ConfigureAwait(false);
-
         _numberOfPartitions = Convert.ToInt32(await 
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false));
         _isPartitionedTopic = _numberOfPartitions != 0;
         var numberOfSubReaders = _isPartitionedTopic ? _numberOfPartitions : 1;
@@ -325,5 +326,8 @@ public sealed class Reader<TMessage> : IReader<TMessage>
             await 
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
             _semaphoreSlim.Release();
         }
+
+        if (_faultException is not null)
+            throw new ReaderFaultedException(_faultException);
     }
 }
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
index 9b0de93..1c1d6dd 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -145,7 +145,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
             throw new ConsumerDisposedException(GetType().FullName!);
 
         if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
+            throw _faultException;
     }
 
     public async Task EstablishNewChannel(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/SubReader.cs 
b/src/DotPulsar/Internal/SubReader.cs
index b64facb..f0bfa31 100644
--- a/src/DotPulsar/Internal/SubReader.cs
+++ b/src/DotPulsar/Internal/SubReader.cs
@@ -148,7 +148,7 @@ public sealed class SubReader<TMessage> : IContainsChannel, 
IReader<TMessage>
             throw new ReaderDisposedException(GetType().FullName!);
 
         if (_faultException is not null)
-            throw new ReaderFaultedException(_faultException);
+            throw _faultException;
     }
 
     public async ValueTask ChannelFaulted(Exception exception)
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs 
b/tests/DotPulsar.Tests/ConsumerTests.cs
index 45f8a1e..4c63a51 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
 using FluentAssertions;
 using System;
@@ -202,6 +203,77 @@ public class ConsumerTests
         consumed.Should().BeEquivalentTo(produced);
     }
 
+    [Fact]
+    public async Task 
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
+    {
+        //Arrange
+        var semaphoreSlim = new SemaphoreSlim(1);
+        await using var
+            client = PulsarClient.Builder().ExceptionHandler(context =>
+                {
+                    semaphoreSlim.WaitAsync();
+                    context.Result = FaultAction.Rethrow;
+                    context.ExceptionHandled = true;
+                })
+                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a 
cluster that does not exists.
+                .Build();
+
+        await using var consumer = client.NewConsumer(Schema.String)
+            .StateChangedHandler(changed =>
+            {
+                var topic = changed.Consumer.Topic;
+                var state = changed.ConsumerState;
+                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' 
changed state to '{state}'");
+            })
+            .SubscriptionName("MySubscription")
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        var receiveTask = consumer.Receive().AsTask();
+        semaphoreSlim.Release();
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => receiveTask);
+
+        //Assert
+        exception.Should().BeOfType<ConsumerFaultedException>();
+    }
+
+    [Fact]
+    public async Task 
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
+    {
+        //Arrange
+        var cts = new CancellationTokenSource();
+
+        await using var
+            client = PulsarClient.Builder().ExceptionHandler(context =>
+                {
+                    context.Result = FaultAction.Rethrow;
+                    context.ExceptionHandled = true;
+                })
+                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a 
cluster that does not exists.
+                .Build();
+
+        await using var consumer = client.NewConsumer(Schema.String)
+            .StateChangedHandler(changed =>
+            {
+                var topic = changed.Consumer.Topic;
+                var state = changed.ConsumerState;
+                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' 
changed state to '{state}'");
+            })
+            .SubscriptionName("MySubscription")
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        await consumer.OnStateChangeTo(ConsumerState.Faulted, cts.Token);
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => 
consumer.Receive().AsTask());
+
+        //Assert
+        exception.Should().BeOfType<ConsumerFaultedException>();
+    }
+
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<string> producer, int numberOfMessages, 
CancellationToken ct, string content)
     {
         var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs 
b/tests/DotPulsar.Tests/ReaderTests.cs
index 1b2eb9d..6a595d6 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Tests;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
 using FluentAssertions;
 using System;
@@ -204,6 +205,77 @@ public class ReaderTests
         actual.Should().BeEquivalentTo(expected);
     }
 
+    [Fact]
+    public async Task 
Receive_WhenFaultedAfterInvokingReceive_ShouldThrowConsumerFaultedException()
+    {
+        //Arrange
+        var semaphoreSlim = new SemaphoreSlim(1);
+        await using var
+            client = PulsarClient.Builder().ExceptionHandler(context =>
+                {
+                    semaphoreSlim.WaitAsync();
+                    context.Result = FaultAction.Rethrow;
+                    context.ExceptionHandled = true;
+                })
+                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a 
cluster that does not exists.
+                .Build();
+
+        await using var reader = client.NewReader(Schema.String)
+            .StartMessageId(MessageId.Earliest)
+            .StateChangedHandler(changed =>
+            {
+                var topic = changed.Reader.Topic;
+                var state = changed.ReaderState;
+                _testOutputHelper.WriteLine($"The consumer for topic '{topic}' 
changed state to '{state}'");
+            })
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        var receiveTask = reader.Receive().AsTask();
+        semaphoreSlim.Release();
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => receiveTask);
+
+        //Assert
+        exception.Should().BeOfType<ReaderFaultedException>();
+    }
+
+    [Fact]
+    public async Task 
Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
+    {
+        //Arrange
+        var cts = new CancellationTokenSource();
+
+        await using var
+            client = PulsarClient.Builder().ExceptionHandler(context =>
+                {
+                    context.Result = FaultAction.Rethrow;
+                    context.ExceptionHandled = true;
+                })
+                .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a 
cluster that does not exists.
+                .Build();
+
+        await using var reader = client.NewReader(Schema.String)
+            .StartMessageId(MessageId.Earliest)
+            .StateChangedHandler(changed =>
+            {
+                var topic = changed.Reader.Topic;
+                var state = changed.ReaderState;
+                _testOutputHelper.WriteLine($"The reader for topic '{topic}' 
changed state to '{state}'");
+            })
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        await reader.OnStateChangeTo(ReaderState.Faulted, cts.Token);
+
+        //Act
+        var exception = await Record.ExceptionAsync(() => 
reader.Receive().AsTask());
+
+        //Assert
+        exception.Should().BeOfType<ReaderFaultedException>();
+    }
+
     private IProducer<String> CreateProducer(IPulsarClient pulsarClient, 
string topicName) => pulsarClient.NewProducer(Schema.String)
         .Topic(topicName)
         .Create();

Reply via email to