This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f8070a3  Invoking a method on a consumer or reader that is faulted or 
faults during the operation should result in a Consumer/ReaderFaultedException 
being thrown. This needs testing
f8070a3 is described below

commit f8070a3077c324e510ed31621497a157813e8bf7
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Sat Jan 28 00:19:44 2023 +0100

    Invoking a method on a consumer or reader that is faulted or faults during 
the operation should result in a Consumer/ReaderFaultedException being thrown. 
This needs testing
---
 .../ConsumerFaultedException.cs}                   |  16 +--
 .../FaultedException.cs}                           |  16 +--
 .../ProducerFaultedException.cs}                   |  16 +--
 .../ReaderFaultedException.cs}                     |  16 +--
 .../Internal/Abstractions/IContainsChannel.cs      |   4 +-
 src/DotPulsar/Internal/Abstractions/Process.cs     |   6 +-
 src/DotPulsar/Internal/Consumer.cs                 | 143 ++++++++++++---------
 src/DotPulsar/Internal/ConsumerProcess.cs          |   6 +-
 src/DotPulsar/Internal/Events/ExecutorFaulted.cs   |  10 +-
 src/DotPulsar/Internal/Executor.cs                 |   4 +-
 src/DotPulsar/Internal/Producer.cs                 |  29 ++---
 src/DotPulsar/Internal/Reader.cs                   |  60 +++++----
 src/DotPulsar/Internal/ReaderProcess.cs            |   5 +-
 src/DotPulsar/Internal/SubProducer.cs              |  33 +++--
 14 files changed, 204 insertions(+), 160 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs 
b/src/DotPulsar/Exceptions/ConsumerFaultedException.cs
similarity index 60%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ConsumerFaultedException.cs
index d856108..c331bc4 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ConsumerFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
 
 using System;
-using System.Threading;
-using System.Threading.Tasks;
 
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a consumer has faulted. See innerException
+/// </summary>
+public sealed class ConsumerFaultedException : FaultedException
 {
-    Task EstablishNewChannel(CancellationToken cancellationToken);
-
-    ValueTask CloseChannel(CancellationToken cancellationToken);
+    public ConsumerFaultedException() : base("Consumer has faulted") { }
+    public ConsumerFaultedException(Exception innerException) : base("Consumer 
has faulted", innerException) { }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs 
b/src/DotPulsar/Exceptions/FaultedException.cs
similarity index 59%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/FaultedException.cs
index d856108..65b54de 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/FaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
 
 using System;
-using System.Threading;
-using System.Threading.Tasks;
 
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Base exception for ConsumerFaultedException, ProducerFaultedException, and 
ReaderFaultedException
+/// </summary>
+public abstract class FaultedException : DotPulsarException
 {
-    Task EstablishNewChannel(CancellationToken cancellationToken);
-
-    ValueTask CloseChannel(CancellationToken cancellationToken);
+    public FaultedException(string message) : base(message) { }
+    public FaultedException(string message, Exception innerException) : 
base(message, innerException) { }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs 
b/src/DotPulsar/Exceptions/ProducerFaultedException.cs
similarity index 60%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ProducerFaultedException.cs
index d856108..5a58e43 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ProducerFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
 
 using System;
-using System.Threading;
-using System.Threading.Tasks;
 
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a producer has faulted. See innerException
+/// </summary>
+public sealed class ProducerFaultedException : FaultedException
 {
-    Task EstablishNewChannel(CancellationToken cancellationToken);
-
-    ValueTask CloseChannel(CancellationToken cancellationToken);
+    public ProducerFaultedException() : base("Producer has faulted") { }
+    public ProducerFaultedException(Exception innerException) : base("Producer 
has faulted", innerException) { }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs 
b/src/DotPulsar/Exceptions/ReaderFaultedException.cs
similarity index 61%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ReaderFaultedException.cs
index d856108..18d15ac 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ReaderFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,15 +12,15 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
 
 using System;
-using System.Threading;
-using System.Threading.Tasks;
 
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a reader has faulted. See innerException
+/// </summary>
+public sealed class ReaderFaultedException : FaultedException
 {
-    Task EstablishNewChannel(CancellationToken cancellationToken);
-
-    ValueTask CloseChannel(CancellationToken cancellationToken);
+    public ReaderFaultedException() : base("Reader has faulted") { }
+    public ReaderFaultedException(Exception innerException) : base("Reader has 
faulted", innerException) { }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
index d856108..c28cacd 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -23,4 +23,6 @@ public interface IContainsChannel : IAsyncDisposable
     Task EstablishNewChannel(CancellationToken cancellationToken);
 
     ValueTask CloseChannel(CancellationToken cancellationToken);
+
+    ValueTask ChannelFaulted(Exception exception);
 }
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index 9ef6bd4..0704b8d 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -24,6 +24,7 @@ public abstract class Process : IProcess
     protected readonly CancellationTokenSource CancellationTokenSource;
     protected ChannelState ChannelState;
     protected ExecutorState ExecutorState;
+    protected Exception? Exception;
 
     protected Process(Guid correlationId)
     {
@@ -44,8 +45,9 @@ public abstract class Process : IProcess
     {
         switch (e)
         {
-            case ExecutorFaulted _:
+            case ExecutorFaulted executorFaulted:
                 ExecutorState = ExecutorState.Faulted;
+                Exception = executorFaulted.Exception;
                 break;
             case ChannelActivated _:
                 ChannelState = ChannelState.Active;
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 5740ccf..9cb3327 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -37,6 +37,7 @@ public sealed class Consumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
     private readonly IStateChanged<ConsumerState> _state;
     private readonly IConsumerChannelFactory<TMessage> _factory;
     private int _isDisposed;
+    private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
     public string SubscriptionName { get; }
@@ -86,33 +87,29 @@ public sealed class Consumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
             return;
 
         _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
+        await DisposeChannel().ConfigureAwait(false);
     }
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
+    private async ValueTask DisposeChannel()
     {
-        ThrowIfDisposed();
-
-        return await _executor.Execute(() => 
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+        await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    private async ValueTask<IMessage<TMessage>> 
ReceiveMessage(CancellationToken cancellationToken)
-        => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
+        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
 
     public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
-        => await Acknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
+        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
 
     public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
-        => await Acknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
+        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
 
     public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
-
         var command = new CommandRedeliverUnacknowledgedMessages();
         command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
-        await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
@@ -120,76 +117,35 @@ public sealed class Consumer<TMessage> : 
IContainsChannel, IConsumer<TMessage>
 
     public async ValueTask Unsubscribe(CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
-
         var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => Unsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask Unsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
     public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
-
-    private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
-    private async Task Seek(CommandSeek command, CancellationToken 
cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
-    private async ValueTask Acknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
-    {
-        ThrowIfDisposed();
-
-        var commandAck = _commandAckPool.Get();
-        commandAck.Type = ackType;
-        if (commandAck.MessageIds.Count == 0)
-            commandAck.MessageIds.Add(messageId.ToMessageIdData());
-        else
-            commandAck.MessageIds[0].MapFrom(messageId);
-
-        try
-        {
-            await _executor.Execute(() => Acknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-        finally
-        {
-            _commandAckPool.Return(commandAck);
-        }
+        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask Acknowledge(CommandAck command, CancellationToken 
cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
-    private async ValueTask 
RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, 
CancellationToken cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
-    private void ThrowIfDisposed()
+    private void Guard()
     {
         if (_isDisposed != 0)
             throw new ConsumerDisposedException(GetType().FullName!);
+
+        if (_faultException is not null)
+            throw new ConsumerFaultedException(_faultException);
     }
 
     public async Task EstablishNewChannel(CancellationToken cancellationToken)
@@ -204,7 +160,66 @@ public sealed class Consumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
     }
 
     public async ValueTask CloseChannel(CancellationToken cancellationToken)
+        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+
+    public async ValueTask ChannelFaulted(Exception exception)
+    {
+        _faultException = exception;
+        await DisposeChannel().ConfigureAwait(false);
+    }
+
+    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
+    {
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+    }
+
+    private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
+    {
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+    }
+
+    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    {
+        Guard();
+        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+    }
+
+    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
+    {
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+    }
+
+    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
+    {
+        Guard();
+        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+    }
+
+    private async ValueTask InternalUnsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
     {
-        await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+    }
+
+    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
+    {
+        var commandAck = _commandAckPool.Get();
+        commandAck.Type = ackType;
+        if (commandAck.MessageIds.Count == 0)
+            commandAck.MessageIds.Add(messageId.ToMessageIdData());
+        else
+            commandAck.MessageIds[0].MapFrom(messageId);
+
+        try
+        {
+            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+        finally
+        {
+            _commandAckPool.Return(commandAck);
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index 9d06d41..1246a93 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -50,7 +50,9 @@ public sealed class ConsumerProcess : Process
 
         if (ExecutorState == ExecutorState.Faulted)
         {
-            _stateManager.SetState(ConsumerState.Faulted);
+            var formerState = _stateManager.SetState(ConsumerState.Faulted);
+            if (formerState != ConsumerState.Faulted)
+                Task.Run(() => _consumer.ChannelFaulted(Exception!));
             return;
         }
 
diff --git a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs 
b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
index ea56b6c..337c8ab 100644
--- a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
+++ b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +19,12 @@ using System;
 
 public sealed class ExecutorFaulted : IEvent
 {
-    public ExecutorFaulted(Guid correlationId)
-        => CorrelationId = correlationId;
+    public ExecutorFaulted(Guid correlationId, Exception exception)
+    {
+        CorrelationId = correlationId;
+        Exception = exception;
+    }
 
     public Guid CorrelationId { get; }
+    public Exception Exception { get; }
 }
diff --git a/src/DotPulsar/Internal/Executor.cs 
b/src/DotPulsar/Internal/Executor.cs
index 27f9faf..f307ff0 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -188,7 +188,7 @@ public sealed class Executor : IExecute
         await _exceptionHandler.OnException(context).ConfigureAwait(false);
 
         if (context.Result != FaultAction.Retry)
-            _eventRegister.Register(new ExecutorFaulted(_correlationId));
+            _eventRegister.Register(new ExecutorFaulted(_correlationId, 
exception));
 
         return context.Result == FaultAction.ThrowException
             ? throw context.Exception
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 38fb54b..c9d608a 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -23,7 +23,6 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
-using System.Runtime.ExceptionServices;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -46,7 +45,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
     private readonly IExecute _executor;
     private int _isDisposed;
     private int _producerCount;
-    private Exception? _throw;
+    private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
@@ -105,7 +104,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             if (_cts.IsCancellationRequested)
                 return;
 
-            _throw = exception;
+            _faultException = exception;
             _state.SetState(ProducerState.Faulted);
         }
     }
@@ -231,8 +230,8 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         if (_producerCount == 0)
         {
             _ = await _state.StateChangedFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
-            if (_throw is not null)
-                ExceptionDispatchInfo.Capture(_throw).Throw(); //Retain 
original stack trace by throwing like this
+            if (_faultException is not null)
+                throw new ProducerFaultedException(_faultException);
         }
 
         if (_producerCount == 1)
@@ -258,9 +257,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     }
 
     public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, 
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken 
cancellationToken = default)
-    {
-        await InternalSend(metadata, message, false, onMessageSent, 
cancellationToken).ConfigureAwait(false);
-    }
+        => await InternalSend(metadata, message, false, onMessageSent, 
cancellationToken).ConfigureAwait(false);
 
     private async ValueTask InternalSend(MessageMetadata metadata, TMessage 
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent = 
default, CancellationToken cancellationToken = default)
     {
@@ -303,6 +300,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
                 }
 
                 CompleteActivity(task.Result, data.Length, activity);
+
                 try
                 {
                     if (onMessageSent is not null)
@@ -320,18 +318,18 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 
             if (autoAssignSequenceId)
                 metadata.SequenceId = 0;
+
             throw;
         }
     }
 
-    public async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
-    {
-        await Task.WhenAll(_producers.Values.Select(producer => 
producer.WaitForSendQueueEmpty(cancellationToken).AsTask())).ConfigureAwait(false);
-    }
+    internal async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
+        => await Task.WhenAll(_producers.Values.Select(producer => 
producer.WaitForSendQueueEmpty(cancellationToken).AsTask())).ConfigureAwait(false);
 
     private void CompleteActivity(MessageId messageId, long payloadSize, 
Activity? activity)
     {
-        if (activity is null) return;
+        if (activity is null)
+            return;
 
         if (activity.IsAllDataRequested)
         {
@@ -345,7 +343,8 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 
     private void FailActivity(Exception exception, Activity? activity)
     {
-        if (activity is null) return;
+        if (activity is null)
+            return;
 
         if (activity.IsAllDataRequested)
             activity.AddException(exception);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 1778791..868e8b4 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -32,6 +32,7 @@ public sealed class Reader<TMessage> : IContainsChannel, 
IReader<TMessage>
     private readonly IStateChanged<ReaderState> _state;
     private readonly IConsumerChannelFactory<TMessage> _factory;
     private int _isDisposed;
+    private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
@@ -73,39 +74,35 @@ public sealed class Reader<TMessage> : IContainsChannel, 
IReader<TMessage>
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    {
+        Guard();
+        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+    }
 
     public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
-    {
-        ThrowIfDisposed();
+        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
 
-        return await _executor.Execute(() => 
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
+    {
+        Guard();
+        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<IMessage<TMessage>> 
ReceiveMessage(CancellationToken cancellationToken)
-        => await _channel.Receive(cancellationToken).ConfigureAwait(false);
-
     public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask DisposeAsync()
@@ -114,12 +111,20 @@ public sealed class Reader<TMessage> : IContainsChannel, 
IReader<TMessage>
             return;
 
         _eventRegister.Register(new ReaderDisposed(_correlationId));
+        await DisposeChannel().ConfigureAwait(false);
+    }
+
+    private async ValueTask DisposeChannel()
+    {
         await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
         await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    private async Task Seek(CommandSeek command, CancellationToken 
cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
+    {
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+    }
 
     public async Task EstablishNewChannel(CancellationToken cancellationToken)
     {
@@ -133,13 +138,20 @@ public sealed class Reader<TMessage> : IContainsChannel, 
IReader<TMessage>
     }
 
     public async ValueTask CloseChannel(CancellationToken cancellationToken)
-    {
-        await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-    }
+        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
 
-    private void ThrowIfDisposed()
+    private void Guard()
     {
         if (_isDisposed != 0)
             throw new ReaderDisposedException(GetType().FullName!);
+
+        if (_faultException is not null)
+            throw new ReaderFaultedException(_faultException);
+    }
+
+    public async ValueTask ChannelFaulted(Exception exception)
+    {
+        _faultException = exception;
+        await DisposeChannel().ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index 8452c64..1076d84 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -48,6 +48,9 @@ public sealed class ReaderProcess : Process
         if (ExecutorState == ExecutorState.Faulted)
         {
             _stateManager.SetState(ReaderState.Faulted);
+            var formerState = _stateManager.SetState(ReaderState.Faulted);
+            if (formerState != ReaderState.Faulted)
+                Task.Run(() => _reader.ChannelFaulted(Exception!));
             return;
         }
 
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 1f9588d..8d26335 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -87,20 +87,17 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
         {
             // Ignored
         }
+
         await _sendQueue.DisposeAsync().ConfigureAwait(false);
         await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
         await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
     public async ValueTask Send(SendOp sendOp, CancellationToken 
cancellationToken)
-    {
-        await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
-    }
+        => await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
 
-    public async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
-    {
-        await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
-    }
+    internal async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
+        => await 
_sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
 
     private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
     {
@@ -118,13 +115,14 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
             }
 
             var tcs = new TaskCompletionSource<BaseCommand>();
-            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
-                TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
+            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task), 
TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
 
             // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
             var success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
 
-            if (success) continue;
+            if (success)
+                continue;
+
             _eventRegister.Register(new ChannelDisconnected(_correlationId));
             break;
         }
@@ -140,12 +138,16 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
 
             var success = await _executor.TryExecuteOnce(() =>
             {
-                if (responseTask.IsFaulted) throw responseTask.Exception!;
+                if (responseTask.IsFaulted)
+                    throw responseTask.Exception!;
+
                 responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
                 ProcessReceipt(responseTask.Result.SendReceipt);
             }, CancellationToken.None).ConfigureAwait(false); // Use 
CancellationToken.None here because otherwise it will throw exceptions on all 
fault actions even retry.
 
-            if (success) continue;
+            if (success)
+                continue;
+
             _eventRegister.Register(new 
SendReceiptWrongOrdering(_correlationId));
             break;
         }
@@ -205,7 +207,10 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
     }
 
     public async ValueTask CloseChannel(CancellationToken cancellationToken)
+        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+
+    public ValueTask ChannelFaulted(Exception exception)
     {
-        await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+        return new ValueTask();
     }
 }


Reply via email to