This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c93a0de  Producer access mode fixes (#149)
c93a0de is described below

commit c93a0dee3a35d698376ce975b376a6dbf755f128
Author: Kristian Andersen <[email protected]>
AuthorDate: Mon May 1 08:51:11 2023 +0200

    Producer access mode fixes (#149)
    
    * Track topicEpoch when connection occurs after having waited to be 
exclusive
    
    * Fix not faulting if being fenced after having become exclusive
    
    * Fix issue with disconnect while waiting for exlusive
    
    * Fix missing cancellation on send
    
    * Add Fenced producer state
    
    * Unit tests adapted to pass with new fenced status
    
    * Review
---
 CHANGELOG.md                                       |  2 ++
 src/DotPulsar/Internal/Abstractions/IChannel.cs    |  1 +
 .../Abstractions/IContainsProducerChannel.cs       |  2 +-
 .../Internal/Abstractions/IProducerChannel.cs      |  1 -
 src/DotPulsar/Internal/Abstractions/Process.cs     |  5 +++
 src/DotPulsar/Internal/Channel.cs                  |  3 ++
 src/DotPulsar/Internal/ChannelManager.cs           | 15 ++++++---
 .../ProducerChannelConnected.cs}                   | 18 +++++-----
 src/DotPulsar/Internal/NotReadyChannel.cs          |  2 --
 src/DotPulsar/Internal/Producer.cs                 | 36 +++++++++++++-------
 src/DotPulsar/Internal/ProducerChannel.cs          |  6 +---
 src/DotPulsar/Internal/ProducerChannelFactory.cs   | 12 +++++--
 src/DotPulsar/Internal/ProducerProcess.cs          |  8 +++--
 src/DotPulsar/Internal/ProducerResponse.cs         |  4 +--
 src/DotPulsar/Internal/SubProducer.cs              | 39 ++++++++++++++++++----
 src/DotPulsar/ProducerState.cs                     | 17 ++++++----
 tests/DotPulsar.Tests/ProducerTests.cs             |  8 ++---
 17 files changed, 121 insertions(+), 58 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6e6976..cfd3fab 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,10 +9,12 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 ### Added
 
 - Support `ProducerAccessMode` to prevent multiple producers on a single topic.
+- Added `Fenced` state for producers which is a final state. 
 
 ### Fixed
 
 - Fixed issue with `Send` extension methods that do include `MessageMetadata` 
in the parameter list. The issue prevents more than two messages from being 
published on namespaces where deduplication is enabled.
+- Calling `await send(...)` on a Producer did not correctly terminate with an 
exception when a send operation failed, e.g. because the producer faulted.
 
 ## [2.11.0] - 2023-03-13
 
diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index e468a03..8269677 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -23,6 +23,7 @@ public interface IChannel
     void ClosedByServer();
     void WaitingForExclusive();
     void Connected();
+    void ProducerConnected(ulong topicEpoch);
     void Deactivated();
     void Disconnected();
     void ReachedEndOfTopic();
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
index 550be53..0558216 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
@@ -19,5 +19,5 @@ using System.Threading.Tasks;
 
 public interface IContainsProducerChannel : IContainsChannel
 {
-    Task ActivateChannel(CancellationToken cancellationToken);
+    Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken);
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 79e7e37..4bdbdeb 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,6 @@ using System.Threading.Tasks;
 
 public interface IProducerChannel : IAsyncDisposable
 {
-    ulong? TopicEpoch { get; }
     Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, 
TaskCompletionSource<BaseCommand> responseTcs, CancellationToken 
cancellationToken);
     ValueTask ClosedByClient(CancellationToken cancellationToken);
 }
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index f6d42c5..d509bfd 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -35,6 +35,7 @@ public abstract class Process : IProcess
     }
 
     public Guid CorrelationId { get; }
+    protected ulong? TopicEpoch { get; private set; }
 
     public void Start()
         => CalculateState();
@@ -58,6 +59,10 @@ public abstract class Process : IProcess
             case ChannelConnected _:
                 ChannelState = ChannelState.Connected;
                 break;
+            case ProducerChannelConnected producerChannelConnected:
+                TopicEpoch = producerChannelConnected.TopicEpoch;
+                ChannelState = ChannelState.Connected;
+                break;
             case ChannelDeactivated _:
                 ChannelState = ChannelState.Inactive;
                 break;
diff --git a/src/DotPulsar/Internal/Channel.cs 
b/src/DotPulsar/Internal/Channel.cs
index 7e339f9..857c00b 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -61,6 +61,9 @@ public sealed class Channel : IChannel
     public void Connected()
         => _eventRegister.Register(new ChannelConnected(_correlationId));
 
+    public void ProducerConnected(ulong topicEpoch)
+        => _eventRegister.Register(new 
ProducerChannelConnected(_correlationId, topicEpoch));
+
     public void Deactivated()
         => _eventRegister.Register(new ChannelDeactivated(_correlationId));
 
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index 7427578..87a1fa3 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -59,15 +59,15 @@ public sealed class ChannelManager : IDisposable
 
             if (response.Result.ProducerSuccess.ProducerReady)
             {
-                channel.Connected();
+                
channel.ProducerConnected(response.Result.ProducerSuccess.TopicEpoch);
             }
             else
             {
                 channel.WaitingForExclusive();
-                HandleAdditionalProducerSuccess(command, channel.Connected);
+                HandleAdditionalProducerSuccess(command, 
channel.ProducerConnected);
             }
 
-            return new ProducerResponse(producerId, 
response.Result.ProducerSuccess.ProducerName, 
response.Result.ProducerSuccess.TopicEpoch);
+            return new ProducerResponse(producerId, 
response.Result.ProducerSuccess.ProducerName);
         }, TaskContinuationOptions.OnlyOnRanToCompletion);
     }
 
@@ -258,16 +258,21 @@ public sealed class ChannelManager : IDisposable
         return channel.SenderLock();
     }
 
-    private void HandleAdditionalProducerSuccess(CommandProducer command, 
Action successAction)
+    private void HandleAdditionalProducerSuccess(CommandProducer command, 
Action<ulong> successAction)
     {
         _ = 
_requestResponseHandler.ExpectAdditionalResponse(command).ContinueWith(response 
=>
         {
+            if (response.IsCanceled || response.IsFaulted || 
response.Result.CommandType == BaseCommand.Type.Error)
+            {
+                _producerChannels[command.ProducerId]?.Disconnected();
+                return;
+            }
             if (!response.Result.ProducerSuccess.ProducerReady)
             {
                 HandleAdditionalProducerSuccess(command, successAction);
                 return;
             }
-            successAction.Invoke();
+            successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
         });
     }
 }
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs 
b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
similarity index 67%
copy from src/DotPulsar/Internal/ProducerResponse.cs
copy to src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
index 635926d..ccc555a 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,18 +12,20 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal;
+namespace DotPulsar.Internal.Events;
 
-public sealed class ProducerResponse
+using DotPulsar.Internal.Abstractions;
+using System;
+
+public sealed class ProducerChannelConnected : IEvent
 {
-    public ProducerResponse(ulong producerId, string producerName, ulong 
topicEpoch)
+    public ProducerChannelConnected(Guid correlationId, ulong topicEpoch)
     {
-        ProducerId = producerId;
-        ProducerName = producerName;
+        CorrelationId = correlationId;
         TopicEpoch = topicEpoch;
     }
 
-    public ulong ProducerId { get; }
-    public string ProducerName { get; }
+    public Guid CorrelationId { get; }
+
     public ulong TopicEpoch { get; }
 }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs 
b/src/DotPulsar/Internal/NotReadyChannel.cs
index ca602ad..b1877b7 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -34,8 +34,6 @@ public sealed class NotReadyChannel<TMessage> : 
IConsumerChannel<TMessage>, IPro
     public ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken = default)
         => throw GetException();
 
-    public ulong? TopicEpoch { get => null; }
-
     public Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, 
TaskCompletionSource<BaseCommand> responseTcs, CancellationToken 
cancellationToken)
         => throw GetException();
 
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index e29095a..dedb8f9 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -74,7 +74,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
         };
         _attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
         _sequenceId = new SequenceId(options.InitialSequenceId);
-        _state = new StateManager<ProducerState>(ProducerState.Disconnected, 
ProducerState.Closed, ProducerState.Faulted);
+        _state = CreateStateManager();
         ServiceUrl = serviceUrl;
         Topic = options.Topic;
         _isDisposed = 0;
@@ -146,27 +146,32 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
                 switch (state)
                 {
                     case ProducerState.Connected:
-                        ++connectedProducers;
-                        waitingForExclusive[i] = false;
+                        if (waitingForExclusive[i])
+                            waitingForExclusive[i] = false;
+                        else
+                            ++connectedProducers;
                         break;
                     case ProducerState.Disconnected:
                         --connectedProducers;
+                        waitingForExclusive[i] = false;
                         break;
                     case ProducerState.WaitingForExclusive:
+                        ++connectedProducers;
                         waitingForExclusive[i] = true;
                         break;
+                    case ProducerState.Fenced:
                     case ProducerState.Faulted:
-                        _state.SetState(ProducerState.Faulted);
+                        _state.SetState(state);
                         return;
                 }
 
                 monitoringTasks[i] = _producers[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
             }
 
-            if (connectedProducers == monitoringTasks.Length)
-                _state.SetState(ProducerState.Connected);
-            else if (connectedProducers == 0 && waitingForExclusive.All(x => x 
!= true))
+            if (connectedProducers == 0)
                 _state.SetState(ProducerState.Disconnected);
+            else if (connectedProducers == monitoringTasks.Length && 
waitingForExclusive.All(x => x != true))
+                _state.SetState(ProducerState.Connected);
             else if (waitingForExclusive.Any(x => x))
                 _state.SetState(ProducerState.WaitingForExclusive);
             else
@@ -181,7 +186,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         var schema = _options.Schema;
         var producerAccessMode = 
(DotPulsar.Internal.PulsarApi.ProducerAccessMode) _options.ProducerAccessMode;
         var factory = new ProducerChannelFactory(correlationId, 
_processManager, _connectionPool, topic, producerName, producerAccessMode, 
schema.SchemaInfo, _compressorFactory);
-        var stateManager = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
+        var stateManager = CreateStateManager();
         var initialChannel = new NotReadyChannel<TMessage>();
         var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
         var producer = new SubProducer(correlationId, _processManager, 
initialChannel, executor, stateManager, factory, _options.MaxPendingMessages);
@@ -265,7 +270,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 
         try
         {
-            await InternalSend(metadata, message, true, OnMessageSent, 
cancellationToken).ConfigureAwait(false);
+            await InternalSend(metadata, message, true, OnMessageSent, x => 
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
             return await tcs.Task.ConfigureAwait(false);
         }
         finally
@@ -275,9 +280,9 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     }
 
     public async ValueTask Enqueue(MessageMetadata metadata, TMessage message, 
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken 
cancellationToken = default)
-        => await InternalSend(metadata, message, false, onMessageSent, 
cancellationToken).ConfigureAwait(false);
+        => await InternalSend(metadata, message, false, onMessageSent, 
cancellationToken: cancellationToken).ConfigureAwait(false);
 
-    private async ValueTask InternalSend(MessageMetadata metadata, TMessage 
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent = 
default, CancellationToken cancellationToken = default)
+    private async ValueTask InternalSend(MessageMetadata metadata, TMessage 
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent = 
default, Action<Exception>? onFailed = default, CancellationToken 
cancellationToken = default)
     {
         ThrowIfDisposed();
 
@@ -311,10 +316,14 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 
                 if (task.IsFaulted || task.IsCanceled)
                 {
-                    FailActivity(task.IsCanceled ? new 
OperationCanceledException() : task.Exception!, activity);
+                    Exception exception = task.IsCanceled ? new 
OperationCanceledException() : task.Exception!;
+                    FailActivity(exception, activity);
 
                     if (autoAssignSequenceId)
                         metadata.SequenceId = 0;
+
+                    onFailed?.Invoke(exception);
+                    return;
                 }
 
                 CompleteActivity(task.Result, data.Length, activity);
@@ -376,5 +385,8 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             throw new ProducerDisposedException(GetType().FullName!);
     }
 
+    private StateManager<ProducerState> CreateStateManager()
+        => new (ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted, ProducerState.Fenced);
+
     public void Register(IEvent @event) { }
 }
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs 
b/src/DotPulsar/Internal/ProducerChannel.cs
index d884120..ba2da76 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -31,15 +31,12 @@ public sealed class ProducerChannel : IProducerChannel
     private readonly ICompressorFactory? _compressorFactory;
     private readonly byte[]? _schemaVersion;
 
-    public ulong? TopicEpoch { get; }
-
     public ProducerChannel(
         ulong id,
         string name,
         IConnection connection,
         ICompressorFactory? compressorFactory,
-        byte[]? schemaVersion,
-        ulong topicEpoch)
+        byte[]? schemaVersion)
     {
         var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
         _sendPackagePool = new 
DefaultObjectPool<SendPackage>(sendPackagePolicy);
@@ -48,7 +45,6 @@ public sealed class ProducerChannel : IProducerChannel
         _connection = connection;
         _compressorFactory = compressorFactory;
         _schemaVersion = schemaVersion;
-        TopicEpoch = topicEpoch;
     }
 
     public async ValueTask ClosedByClient(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 92f4ac5..fcf2b48 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -57,12 +57,20 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
 
     public async Task<IProducerChannel> Create(ulong? topicEpoch, 
CancellationToken cancellationToken)
     {
-        if (topicEpoch.HasValue) _commandProducer.TopicEpoch = 
topicEpoch.Value;
+        if (topicEpoch.HasValue)
+        {
+            if (_commandProducer.ProducerAccessMode != 
ProducerAccessMode.Shared)
+                _commandProducer.ProducerAccessMode = 
ProducerAccessMode.Exclusive;
+            _commandProducer.TopicEpoch = topicEpoch.Value;
+        }
+        else
+            _commandProducer.ResetTopicEpoch();
+
         var connection = await 
_connectionPool.FindConnectionForTopic(_commandProducer.Topic, 
cancellationToken).ConfigureAwait(false);
         var channel = new Channel(_correlationId, _eventRegister, new 
AsyncQueue<MessagePackage>());
         var response = await connection.Send(_commandProducer, channel, 
cancellationToken).ConfigureAwait(false);
         var schemaVersion = await GetSchemaVersion(connection, 
cancellationToken).ConfigureAwait(false);
-        return new ProducerChannel(response.ProducerId, response.ProducerName, 
connection, _compressorFactory, schemaVersion, response.TopicEpoch);
+        return new ProducerChannel(response.ProducerId, response.ProducerName, 
connection, _compressorFactory, schemaVersion);
     }
 
     private async ValueTask<byte[]?> GetSchemaVersion(IConnection connection, 
CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index 4ff3671..d5f12d3 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -14,6 +14,7 @@
 
 namespace DotPulsar.Internal;
 
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using System;
 using System.Threading;
@@ -52,7 +53,10 @@ public sealed class ProducerProcess : Process
 
         if (ExecutorState == ExecutorState.Faulted)
         {
-            _stateManager.SetState(ProducerState.Faulted);
+            ProducerState newState = Exception! is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
+            var formerState = _stateManager.SetState(newState);
+            if (formerState != ProducerState.Faulted && formerState != 
ProducerState.Fenced)
+                _actionQueue.Enqueue(async _ => await 
_producer.ChannelFaulted(Exception!));
             return;
         }
 
@@ -70,7 +74,7 @@ public sealed class ProducerProcess : Process
             case ChannelState.Connected:
                 _actionQueue.Enqueue(async x =>
                 {
-                    await _producer.ActivateChannel(x).ConfigureAwait(false);
+                    await _producer.ActivateChannel(TopicEpoch, 
x).ConfigureAwait(false);
                     _stateManager.SetState(ProducerState.Connected);
                 });
                 return;
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs 
b/src/DotPulsar/Internal/ProducerResponse.cs
index 635926d..474dc5f 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -16,14 +16,12 @@ namespace DotPulsar.Internal;
 
 public sealed class ProducerResponse
 {
-    public ProducerResponse(ulong producerId, string producerName, ulong 
topicEpoch)
+    public ProducerResponse(ulong producerId, string producerName)
     {
         ProducerId = producerId;
         ProducerName = producerName;
-        TopicEpoch = topicEpoch;
     }
 
     public ulong ProducerId { get; }
     public string ProducerName { get; }
-    public ulong TopicEpoch { get; }
 }
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 1439da1..aa3f858 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Events;
 using DotPulsar.Internal.Exceptions;
@@ -36,6 +37,8 @@ public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState
     private readonly IStateChanged<ProducerState> _state;
     private readonly IProducerChannelFactory _factory;
     private int _isDisposed;
+    private ulong? _topicEpoch;
+    private Exception? _faultException;
 
     public SubProducer(
         Guid correlationId,
@@ -76,7 +79,11 @@ public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState
             return;
 
         _eventRegister.Register(new ProducerDisposed(_correlationId));
+        await InternalDispose().ConfigureAwait(false);
+    }
 
+    private async ValueTask InternalDispose()
+    {
         try
         {
             _dispatcherCts?.Cancel();
@@ -94,10 +101,16 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
     }
 
     public async ValueTask Send(SendOp sendOp, CancellationToken 
cancellationToken)
-        => await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
+    {
+        Guard();
+        await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
+    }
 
     internal async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
-        => await 
_sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+    {
+        Guard();
+        await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+    }
 
     private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
     {
@@ -203,7 +216,6 @@ public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState
 
         await _executor.TryExecuteOnce(() => _dispatcherTask ?? 
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
 
-        ulong? topicEpoch = _channel.TopicEpoch;
         try
         {
             var oldChannel = _channel;
@@ -214,11 +226,12 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
             // Ignored
         }
 
-        _channel = await _executor.Execute(() => _factory.Create(topicEpoch, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        _channel = await _executor.Execute(() => _factory.Create(_topicEpoch, 
cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    public async Task ActivateChannel(CancellationToken cancellationToken)
+    public async Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken)
     {
+        _topicEpoch ??= topicEpoch;
         _dispatcherCts = new CancellationTokenSource();
         await _executor.Execute(() =>
         {
@@ -230,8 +243,20 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
     public async ValueTask CloseChannel(CancellationToken cancellationToken)
         => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
 
-    public ValueTask ChannelFaulted(Exception exception)
+    public async ValueTask ChannelFaulted(Exception exception)
     {
-        return new ValueTask();
+        _faultException = exception;
+        await InternalDispose().ConfigureAwait(false);
+    }
+
+    private void Guard()
+    {
+        if (_isDisposed != 0)
+            throw new ProducerDisposedException(GetType().FullName!);
+
+        if (_faultException is ProducerFencedException)
+            throw _faultException;
+        if (_faultException is not null)
+            throw new ProducerFaultedException(_faultException);
     }
 }
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
index 8eb9527..f3b3190 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/ProducerState.cs
@@ -29,11 +29,6 @@ public enum ProducerState : byte
     /// </summary>
     Connected,
 
-    /// <summary>
-    /// The producer is connected but waiting for exclusive access.
-    /// </summary>
-    WaitingForExclusive,
-
     /// <summary>
     /// The producer is disconnected.
     /// </summary>
@@ -47,5 +42,15 @@ public enum ProducerState : byte
     /// <summary>
     /// Some of the sub-producers are disconnected.
     /// </summary>
-    PartiallyConnected
+    PartiallyConnected,
+
+    /// <summary>
+    /// The producer is connected but waiting for exclusive access.
+    /// </summary>
+    WaitingForExclusive,
+
+    /// <summary>
+    /// The producer has been fenced by the broker. This is a final state.
+    /// </summary>
+    Fenced
 }
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs 
b/tests/DotPulsar.Tests/ProducerTests.cs
index ad49286..db2b1ce 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -103,7 +103,7 @@ public class ProducerTests
 
     [Theory]
     [InlineData(ProducerAccessMode.Shared, ProducerState.Connected)]
-    [InlineData(ProducerAccessMode.Exclusive, ProducerState.Faulted)]
+    [InlineData(ProducerAccessMode.Exclusive, ProducerState.Fenced)]
     [InlineData(ProducerAccessMode.WaitForExclusive, 
ProducerState.WaitingForExclusive)]
     public async Task 
TwoProducers_WhenConnectingSecond_ThenGoToExpectedState(ProducerAccessMode 
accessMode, ProducerState expectedState)
     {
@@ -164,15 +164,15 @@ public class ProducerTests
             //Ignore
         }
 
-        var result = await producer1.OnStateChangeTo(ProducerState.Faulted, 
cts.Token);
+        var result = await producer1.OnStateChangeTo(ProducerState.Fenced, 
cts.Token);
 
         //Assert
-        result.Should().Be(ProducerState.Faulted);
+        result.Should().Be(ProducerState.Fenced);
     }
 
     [Theory]
     [InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.Shared, 
ProducerState.Connected, ProducerState.Disconnected)]
-    [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive, 
ProducerState.Connected, ProducerState.Faulted)]
+    [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive, 
ProducerState.Connected, ProducerState.Fenced)]
     [InlineData(ProducerAccessMode.Shared, 
ProducerAccessMode.WaitForExclusive, ProducerState.Connected, 
ProducerState.WaitingForExclusive)]
     [InlineData(ProducerAccessMode.Exclusive, 
ProducerAccessMode.WaitForExclusive, ProducerState.Connected, 
ProducerState.WaitingForExclusive)]
 

Reply via email to