This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c93a0de Producer access mode fixes (#149)
c93a0de is described below
commit c93a0dee3a35d698376ce975b376a6dbf755f128
Author: Kristian Andersen <[email protected]>
AuthorDate: Mon May 1 08:51:11 2023 +0200
Producer access mode fixes (#149)
* Track topicEpoch when connection occurs after having waited to be
exclusive
* Fix not faulting if being fenced after having become exclusive
* Fix issue with disconnect while waiting for exlusive
* Fix missing cancellation on send
* Add Fenced producer state
* Unit tests adapted to pass with new fenced status
* Review
---
CHANGELOG.md | 2 ++
src/DotPulsar/Internal/Abstractions/IChannel.cs | 1 +
.../Abstractions/IContainsProducerChannel.cs | 2 +-
.../Internal/Abstractions/IProducerChannel.cs | 1 -
src/DotPulsar/Internal/Abstractions/Process.cs | 5 +++
src/DotPulsar/Internal/Channel.cs | 3 ++
src/DotPulsar/Internal/ChannelManager.cs | 15 ++++++---
.../ProducerChannelConnected.cs} | 18 +++++-----
src/DotPulsar/Internal/NotReadyChannel.cs | 2 --
src/DotPulsar/Internal/Producer.cs | 36 +++++++++++++-------
src/DotPulsar/Internal/ProducerChannel.cs | 6 +---
src/DotPulsar/Internal/ProducerChannelFactory.cs | 12 +++++--
src/DotPulsar/Internal/ProducerProcess.cs | 8 +++--
src/DotPulsar/Internal/ProducerResponse.cs | 4 +--
src/DotPulsar/Internal/SubProducer.cs | 39 ++++++++++++++++++----
src/DotPulsar/ProducerState.cs | 17 ++++++----
tests/DotPulsar.Tests/ProducerTests.cs | 8 ++---
17 files changed, 121 insertions(+), 58 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6e6976..cfd3fab 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,10 +9,12 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Support `ProducerAccessMode` to prevent multiple producers on a single topic.
+- Added `Fenced` state for producers which is a final state.
### Fixed
- Fixed issue with `Send` extension methods that do include `MessageMetadata`
in the parameter list. The issue prevents more than two messages from being
published on namespaces where deduplication is enabled.
+- Calling `await send(...)` on a Producer did not correctly terminate with an
exception when a send operation failed, e.g. because the producer faulted.
## [2.11.0] - 2023-03-13
diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index e468a03..8269677 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -23,6 +23,7 @@ public interface IChannel
void ClosedByServer();
void WaitingForExclusive();
void Connected();
+ void ProducerConnected(ulong topicEpoch);
void Deactivated();
void Disconnected();
void ReachedEndOfTopic();
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
index 550be53..0558216 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
@@ -19,5 +19,5 @@ using System.Threading.Tasks;
public interface IContainsProducerChannel : IContainsChannel
{
- Task ActivateChannel(CancellationToken cancellationToken);
+ Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken);
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 79e7e37..4bdbdeb 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,6 @@ using System.Threading.Tasks;
public interface IProducerChannel : IAsyncDisposable
{
- ulong? TopicEpoch { get; }
Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload,
TaskCompletionSource<BaseCommand> responseTcs, CancellationToken
cancellationToken);
ValueTask ClosedByClient(CancellationToken cancellationToken);
}
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs
b/src/DotPulsar/Internal/Abstractions/Process.cs
index f6d42c5..d509bfd 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -35,6 +35,7 @@ public abstract class Process : IProcess
}
public Guid CorrelationId { get; }
+ protected ulong? TopicEpoch { get; private set; }
public void Start()
=> CalculateState();
@@ -58,6 +59,10 @@ public abstract class Process : IProcess
case ChannelConnected _:
ChannelState = ChannelState.Connected;
break;
+ case ProducerChannelConnected producerChannelConnected:
+ TopicEpoch = producerChannelConnected.TopicEpoch;
+ ChannelState = ChannelState.Connected;
+ break;
case ChannelDeactivated _:
ChannelState = ChannelState.Inactive;
break;
diff --git a/src/DotPulsar/Internal/Channel.cs
b/src/DotPulsar/Internal/Channel.cs
index 7e339f9..857c00b 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -61,6 +61,9 @@ public sealed class Channel : IChannel
public void Connected()
=> _eventRegister.Register(new ChannelConnected(_correlationId));
+ public void ProducerConnected(ulong topicEpoch)
+ => _eventRegister.Register(new
ProducerChannelConnected(_correlationId, topicEpoch));
+
public void Deactivated()
=> _eventRegister.Register(new ChannelDeactivated(_correlationId));
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index 7427578..87a1fa3 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -59,15 +59,15 @@ public sealed class ChannelManager : IDisposable
if (response.Result.ProducerSuccess.ProducerReady)
{
- channel.Connected();
+
channel.ProducerConnected(response.Result.ProducerSuccess.TopicEpoch);
}
else
{
channel.WaitingForExclusive();
- HandleAdditionalProducerSuccess(command, channel.Connected);
+ HandleAdditionalProducerSuccess(command,
channel.ProducerConnected);
}
- return new ProducerResponse(producerId,
response.Result.ProducerSuccess.ProducerName,
response.Result.ProducerSuccess.TopicEpoch);
+ return new ProducerResponse(producerId,
response.Result.ProducerSuccess.ProducerName);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
@@ -258,16 +258,21 @@ public sealed class ChannelManager : IDisposable
return channel.SenderLock();
}
- private void HandleAdditionalProducerSuccess(CommandProducer command,
Action successAction)
+ private void HandleAdditionalProducerSuccess(CommandProducer command,
Action<ulong> successAction)
{
_ =
_requestResponseHandler.ExpectAdditionalResponse(command).ContinueWith(response
=>
{
+ if (response.IsCanceled || response.IsFaulted ||
response.Result.CommandType == BaseCommand.Type.Error)
+ {
+ _producerChannels[command.ProducerId]?.Disconnected();
+ return;
+ }
if (!response.Result.ProducerSuccess.ProducerReady)
{
HandleAdditionalProducerSuccess(command, successAction);
return;
}
- successAction.Invoke();
+ successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
});
}
}
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs
b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
similarity index 67%
copy from src/DotPulsar/Internal/ProducerResponse.cs
copy to src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
index 635926d..ccc555a 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,18 +12,20 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal;
+namespace DotPulsar.Internal.Events;
-public sealed class ProducerResponse
+using DotPulsar.Internal.Abstractions;
+using System;
+
+public sealed class ProducerChannelConnected : IEvent
{
- public ProducerResponse(ulong producerId, string producerName, ulong
topicEpoch)
+ public ProducerChannelConnected(Guid correlationId, ulong topicEpoch)
{
- ProducerId = producerId;
- ProducerName = producerName;
+ CorrelationId = correlationId;
TopicEpoch = topicEpoch;
}
- public ulong ProducerId { get; }
- public string ProducerName { get; }
+ public Guid CorrelationId { get; }
+
public ulong TopicEpoch { get; }
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index ca602ad..b1877b7 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -34,8 +34,6 @@ public sealed class NotReadyChannel<TMessage> :
IConsumerChannel<TMessage>, IPro
public ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken = default)
=> throw GetException();
- public ulong? TopicEpoch { get => null; }
-
public Task Send(MessageMetadata metadata, ReadOnlySequence<byte> payload,
TaskCompletionSource<BaseCommand> responseTcs, CancellationToken
cancellationToken)
=> throw GetException();
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index e29095a..dedb8f9 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -74,7 +74,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>,
IRegisterEvent
};
_attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
_sequenceId = new SequenceId(options.InitialSequenceId);
- _state = new StateManager<ProducerState>(ProducerState.Disconnected,
ProducerState.Closed, ProducerState.Faulted);
+ _state = CreateStateManager();
ServiceUrl = serviceUrl;
Topic = options.Topic;
_isDisposed = 0;
@@ -146,27 +146,32 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
switch (state)
{
case ProducerState.Connected:
- ++connectedProducers;
- waitingForExclusive[i] = false;
+ if (waitingForExclusive[i])
+ waitingForExclusive[i] = false;
+ else
+ ++connectedProducers;
break;
case ProducerState.Disconnected:
--connectedProducers;
+ waitingForExclusive[i] = false;
break;
case ProducerState.WaitingForExclusive:
+ ++connectedProducers;
waitingForExclusive[i] = true;
break;
+ case ProducerState.Fenced:
case ProducerState.Faulted:
- _state.SetState(ProducerState.Faulted);
+ _state.SetState(state);
return;
}
monitoringTasks[i] = _producers[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
}
- if (connectedProducers == monitoringTasks.Length)
- _state.SetState(ProducerState.Connected);
- else if (connectedProducers == 0 && waitingForExclusive.All(x => x
!= true))
+ if (connectedProducers == 0)
_state.SetState(ProducerState.Disconnected);
+ else if (connectedProducers == monitoringTasks.Length &&
waitingForExclusive.All(x => x != true))
+ _state.SetState(ProducerState.Connected);
else if (waitingForExclusive.Any(x => x))
_state.SetState(ProducerState.WaitingForExclusive);
else
@@ -181,7 +186,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var schema = _options.Schema;
var producerAccessMode =
(DotPulsar.Internal.PulsarApi.ProducerAccessMode) _options.ProducerAccessMode;
var factory = new ProducerChannelFactory(correlationId,
_processManager, _connectionPool, topic, producerName, producerAccessMode,
schema.SchemaInfo, _compressorFactory);
- var stateManager = new
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted);
+ var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
var producer = new SubProducer(correlationId, _processManager,
initialChannel, executor, stateManager, factory, _options.MaxPendingMessages);
@@ -265,7 +270,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
try
{
- await InternalSend(metadata, message, true, OnMessageSent,
cancellationToken).ConfigureAwait(false);
+ await InternalSend(metadata, message, true, OnMessageSent, x =>
tcs.TrySetException(x), cancellationToken).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}
finally
@@ -275,9 +280,9 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
}
public async ValueTask Enqueue(MessageMetadata metadata, TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken
cancellationToken = default)
- => await InternalSend(metadata, message, false, onMessageSent,
cancellationToken).ConfigureAwait(false);
+ => await InternalSend(metadata, message, false, onMessageSent,
cancellationToken: cancellationToken).ConfigureAwait(false);
- private async ValueTask InternalSend(MessageMetadata metadata, TMessage
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent =
default, CancellationToken cancellationToken = default)
+ private async ValueTask InternalSend(MessageMetadata metadata, TMessage
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent =
default, Action<Exception>? onFailed = default, CancellationToken
cancellationToken = default)
{
ThrowIfDisposed();
@@ -311,10 +316,14 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
if (task.IsFaulted || task.IsCanceled)
{
- FailActivity(task.IsCanceled ? new
OperationCanceledException() : task.Exception!, activity);
+ Exception exception = task.IsCanceled ? new
OperationCanceledException() : task.Exception!;
+ FailActivity(exception, activity);
if (autoAssignSequenceId)
metadata.SequenceId = 0;
+
+ onFailed?.Invoke(exception);
+ return;
}
CompleteActivity(task.Result, data.Length, activity);
@@ -376,5 +385,8 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
throw new ProducerDisposedException(GetType().FullName!);
}
+ private StateManager<ProducerState> CreateStateManager()
+ => new (ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted, ProducerState.Fenced);
+
public void Register(IEvent @event) { }
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs
b/src/DotPulsar/Internal/ProducerChannel.cs
index d884120..ba2da76 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -31,15 +31,12 @@ public sealed class ProducerChannel : IProducerChannel
private readonly ICompressorFactory? _compressorFactory;
private readonly byte[]? _schemaVersion;
- public ulong? TopicEpoch { get; }
-
public ProducerChannel(
ulong id,
string name,
IConnection connection,
ICompressorFactory? compressorFactory,
- byte[]? schemaVersion,
- ulong topicEpoch)
+ byte[]? schemaVersion)
{
var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
_sendPackagePool = new
DefaultObjectPool<SendPackage>(sendPackagePolicy);
@@ -48,7 +45,6 @@ public sealed class ProducerChannel : IProducerChannel
_connection = connection;
_compressorFactory = compressorFactory;
_schemaVersion = schemaVersion;
- TopicEpoch = topicEpoch;
}
public async ValueTask ClosedByClient(CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 92f4ac5..fcf2b48 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -57,12 +57,20 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
public async Task<IProducerChannel> Create(ulong? topicEpoch,
CancellationToken cancellationToken)
{
- if (topicEpoch.HasValue) _commandProducer.TopicEpoch =
topicEpoch.Value;
+ if (topicEpoch.HasValue)
+ {
+ if (_commandProducer.ProducerAccessMode !=
ProducerAccessMode.Shared)
+ _commandProducer.ProducerAccessMode =
ProducerAccessMode.Exclusive;
+ _commandProducer.TopicEpoch = topicEpoch.Value;
+ }
+ else
+ _commandProducer.ResetTopicEpoch();
+
var connection = await
_connectionPool.FindConnectionForTopic(_commandProducer.Topic,
cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new
AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel,
cancellationToken).ConfigureAwait(false);
var schemaVersion = await GetSchemaVersion(connection,
cancellationToken).ConfigureAwait(false);
- return new ProducerChannel(response.ProducerId, response.ProducerName,
connection, _compressorFactory, schemaVersion, response.TopicEpoch);
+ return new ProducerChannel(response.ProducerId, response.ProducerName,
connection, _compressorFactory, schemaVersion);
}
private async ValueTask<byte[]?> GetSchemaVersion(IConnection connection,
CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs
b/src/DotPulsar/Internal/ProducerProcess.cs
index 4ff3671..d5f12d3 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -14,6 +14,7 @@
namespace DotPulsar.Internal;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using System;
using System.Threading;
@@ -52,7 +53,10 @@ public sealed class ProducerProcess : Process
if (ExecutorState == ExecutorState.Faulted)
{
- _stateManager.SetState(ProducerState.Faulted);
+ ProducerState newState = Exception! is ProducerFencedException ?
ProducerState.Fenced : ProducerState.Faulted;
+ var formerState = _stateManager.SetState(newState);
+ if (formerState != ProducerState.Faulted && formerState !=
ProducerState.Fenced)
+ _actionQueue.Enqueue(async _ => await
_producer.ChannelFaulted(Exception!));
return;
}
@@ -70,7 +74,7 @@ public sealed class ProducerProcess : Process
case ChannelState.Connected:
_actionQueue.Enqueue(async x =>
{
- await _producer.ActivateChannel(x).ConfigureAwait(false);
+ await _producer.ActivateChannel(TopicEpoch,
x).ConfigureAwait(false);
_stateManager.SetState(ProducerState.Connected);
});
return;
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs
b/src/DotPulsar/Internal/ProducerResponse.cs
index 635926d..474dc5f 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -16,14 +16,12 @@ namespace DotPulsar.Internal;
public sealed class ProducerResponse
{
- public ProducerResponse(ulong producerId, string producerName, ulong
topicEpoch)
+ public ProducerResponse(ulong producerId, string producerName)
{
ProducerId = producerId;
ProducerName = producerName;
- TopicEpoch = topicEpoch;
}
public ulong ProducerId { get; }
public string ProducerName { get; }
- public ulong TopicEpoch { get; }
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index 1439da1..aa3f858 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Events;
using DotPulsar.Internal.Exceptions;
@@ -36,6 +37,8 @@ public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState
private readonly IStateChanged<ProducerState> _state;
private readonly IProducerChannelFactory _factory;
private int _isDisposed;
+ private ulong? _topicEpoch;
+ private Exception? _faultException;
public SubProducer(
Guid correlationId,
@@ -76,7 +79,11 @@ public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState
return;
_eventRegister.Register(new ProducerDisposed(_correlationId));
+ await InternalDispose().ConfigureAwait(false);
+ }
+ private async ValueTask InternalDispose()
+ {
try
{
_dispatcherCts?.Cancel();
@@ -94,10 +101,16 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
}
public async ValueTask Send(SendOp sendOp, CancellationToken
cancellationToken)
- => await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
+ {
+ Guard();
+ await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
+ }
internal async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
- => await
_sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+ {
+ Guard();
+ await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+ }
private async Task MessageDispatcher(IProducerChannel channel,
CancellationToken cancellationToken)
{
@@ -203,7 +216,6 @@ public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState
await _executor.TryExecuteOnce(() => _dispatcherTask ??
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
- ulong? topicEpoch = _channel.TopicEpoch;
try
{
var oldChannel = _channel;
@@ -214,11 +226,12 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
// Ignored
}
- _channel = await _executor.Execute(() => _factory.Create(topicEpoch,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _channel = await _executor.Execute(() => _factory.Create(_topicEpoch,
cancellationToken), cancellationToken).ConfigureAwait(false);
}
- public async Task ActivateChannel(CancellationToken cancellationToken)
+ public async Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken)
{
+ _topicEpoch ??= topicEpoch;
_dispatcherCts = new CancellationTokenSource();
await _executor.Execute(() =>
{
@@ -230,8 +243,20 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
public async ValueTask CloseChannel(CancellationToken cancellationToken)
=> await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
- public ValueTask ChannelFaulted(Exception exception)
+ public async ValueTask ChannelFaulted(Exception exception)
{
- return new ValueTask();
+ _faultException = exception;
+ await InternalDispose().ConfigureAwait(false);
+ }
+
+ private void Guard()
+ {
+ if (_isDisposed != 0)
+ throw new ProducerDisposedException(GetType().FullName!);
+
+ if (_faultException is ProducerFencedException)
+ throw _faultException;
+ if (_faultException is not null)
+ throw new ProducerFaultedException(_faultException);
}
}
diff --git a/src/DotPulsar/ProducerState.cs b/src/DotPulsar/ProducerState.cs
index 8eb9527..f3b3190 100644
--- a/src/DotPulsar/ProducerState.cs
+++ b/src/DotPulsar/ProducerState.cs
@@ -29,11 +29,6 @@ public enum ProducerState : byte
/// </summary>
Connected,
- /// <summary>
- /// The producer is connected but waiting for exclusive access.
- /// </summary>
- WaitingForExclusive,
-
/// <summary>
/// The producer is disconnected.
/// </summary>
@@ -47,5 +42,15 @@ public enum ProducerState : byte
/// <summary>
/// Some of the sub-producers are disconnected.
/// </summary>
- PartiallyConnected
+ PartiallyConnected,
+
+ /// <summary>
+ /// The producer is connected but waiting for exclusive access.
+ /// </summary>
+ WaitingForExclusive,
+
+ /// <summary>
+ /// The producer has been fenced by the broker. This is a final state.
+ /// </summary>
+ Fenced
}
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs
b/tests/DotPulsar.Tests/ProducerTests.cs
index ad49286..db2b1ce 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -103,7 +103,7 @@ public class ProducerTests
[Theory]
[InlineData(ProducerAccessMode.Shared, ProducerState.Connected)]
- [InlineData(ProducerAccessMode.Exclusive, ProducerState.Faulted)]
+ [InlineData(ProducerAccessMode.Exclusive, ProducerState.Fenced)]
[InlineData(ProducerAccessMode.WaitForExclusive,
ProducerState.WaitingForExclusive)]
public async Task
TwoProducers_WhenConnectingSecond_ThenGoToExpectedState(ProducerAccessMode
accessMode, ProducerState expectedState)
{
@@ -164,15 +164,15 @@ public class ProducerTests
//Ignore
}
- var result = await producer1.OnStateChangeTo(ProducerState.Faulted,
cts.Token);
+ var result = await producer1.OnStateChangeTo(ProducerState.Fenced,
cts.Token);
//Assert
- result.Should().Be(ProducerState.Faulted);
+ result.Should().Be(ProducerState.Fenced);
}
[Theory]
[InlineData(ProducerAccessMode.Exclusive, ProducerAccessMode.Shared,
ProducerState.Connected, ProducerState.Disconnected)]
- [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive,
ProducerState.Connected, ProducerState.Faulted)]
+ [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive,
ProducerState.Connected, ProducerState.Fenced)]
[InlineData(ProducerAccessMode.Shared,
ProducerAccessMode.WaitForExclusive, ProducerState.Connected,
ProducerState.WaitingForExclusive)]
[InlineData(ProducerAccessMode.Exclusive,
ProducerAccessMode.WaitForExclusive, ProducerState.Connected,
ProducerState.WaitingForExclusive)]