This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f8070a3 Invoking a method on a consumer or reader that is faulted or
faults during the operation should result in a Consumer/ReaderFaultedException
being thrown. This needs testing
f8070a3 is described below
commit f8070a3077c324e510ed31621497a157813e8bf7
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Sat Jan 28 00:19:44 2023 +0100
Invoking a method on a consumer or reader that is faulted or faults during
the operation should result in a Consumer/ReaderFaultedException being thrown.
This needs testing
---
.../ConsumerFaultedException.cs} | 16 +--
.../FaultedException.cs} | 16 +--
.../ProducerFaultedException.cs} | 16 +--
.../ReaderFaultedException.cs} | 16 +--
.../Internal/Abstractions/IContainsChannel.cs | 4 +-
src/DotPulsar/Internal/Abstractions/Process.cs | 6 +-
src/DotPulsar/Internal/Consumer.cs | 143 ++++++++++++---------
src/DotPulsar/Internal/ConsumerProcess.cs | 6 +-
src/DotPulsar/Internal/Events/ExecutorFaulted.cs | 10 +-
src/DotPulsar/Internal/Executor.cs | 4 +-
src/DotPulsar/Internal/Producer.cs | 29 ++---
src/DotPulsar/Internal/Reader.cs | 60 +++++----
src/DotPulsar/Internal/ReaderProcess.cs | 5 +-
src/DotPulsar/Internal/SubProducer.cs | 33 +++--
14 files changed, 204 insertions(+), 160 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
b/src/DotPulsar/Exceptions/ConsumerFaultedException.cs
similarity index 60%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ConsumerFaultedException.cs
index d856108..c331bc4 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ConsumerFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
using System;
-using System.Threading;
-using System.Threading.Tasks;
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a consumer has faulted. See innerException
+/// </summary>
+public sealed class ConsumerFaultedException : FaultedException
{
- Task EstablishNewChannel(CancellationToken cancellationToken);
-
- ValueTask CloseChannel(CancellationToken cancellationToken);
+ public ConsumerFaultedException() : base("Consumer has faulted") { }
+ public ConsumerFaultedException(Exception innerException) : base("Consumer
has faulted", innerException) { }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
b/src/DotPulsar/Exceptions/FaultedException.cs
similarity index 59%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/FaultedException.cs
index d856108..65b54de 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/FaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
using System;
-using System.Threading;
-using System.Threading.Tasks;
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Base exception for ConsumerFaultedException, ProducerFaultedException, and
ReaderFaultedException
+/// </summary>
+public abstract class FaultedException : DotPulsarException
{
- Task EstablishNewChannel(CancellationToken cancellationToken);
-
- ValueTask CloseChannel(CancellationToken cancellationToken);
+ public FaultedException(string message) : base(message) { }
+ public FaultedException(string message, Exception innerException) :
base(message, innerException) { }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
b/src/DotPulsar/Exceptions/ProducerFaultedException.cs
similarity index 60%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ProducerFaultedException.cs
index d856108..5a58e43 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ProducerFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
using System;
-using System.Threading;
-using System.Threading.Tasks;
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a producer has faulted. See innerException
+/// </summary>
+public sealed class ProducerFaultedException : FaultedException
{
- Task EstablishNewChannel(CancellationToken cancellationToken);
-
- ValueTask CloseChannel(CancellationToken cancellationToken);
+ public ProducerFaultedException() : base("Producer has faulted") { }
+ public ProducerFaultedException(Exception innerException) : base("Producer
has faulted", innerException) { }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
b/src/DotPulsar/Exceptions/ReaderFaultedException.cs
similarity index 61%
copy from src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
copy to src/DotPulsar/Exceptions/ReaderFaultedException.cs
index d856108..18d15ac 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Exceptions/ReaderFaultedException.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,15 +12,15 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions;
+namespace DotPulsar.Exceptions;
using System;
-using System.Threading;
-using System.Threading.Tasks;
-public interface IContainsChannel : IAsyncDisposable
+/// <summary>
+/// Thrown when a reader has faulted. See innerException
+/// </summary>
+public sealed class ReaderFaultedException : FaultedException
{
- Task EstablishNewChannel(CancellationToken cancellationToken);
-
- ValueTask CloseChannel(CancellationToken cancellationToken);
+ public ReaderFaultedException() : base("Reader has faulted") { }
+ public ReaderFaultedException(Exception innerException) : base("Reader has
faulted", innerException) { }
}
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
b/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
index d856108..c28cacd 100644
--- a/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IContainsChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -23,4 +23,6 @@ public interface IContainsChannel : IAsyncDisposable
Task EstablishNewChannel(CancellationToken cancellationToken);
ValueTask CloseChannel(CancellationToken cancellationToken);
+
+ ValueTask ChannelFaulted(Exception exception);
}
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs
b/src/DotPulsar/Internal/Abstractions/Process.cs
index 9ef6bd4..0704b8d 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -24,6 +24,7 @@ public abstract class Process : IProcess
protected readonly CancellationTokenSource CancellationTokenSource;
protected ChannelState ChannelState;
protected ExecutorState ExecutorState;
+ protected Exception? Exception;
protected Process(Guid correlationId)
{
@@ -44,8 +45,9 @@ public abstract class Process : IProcess
{
switch (e)
{
- case ExecutorFaulted _:
+ case ExecutorFaulted executorFaulted:
ExecutorState = ExecutorState.Faulted;
+ Exception = executorFaulted.Exception;
break;
case ChannelActivated _:
ChannelState = ChannelState.Active;
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 5740ccf..9cb3327 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -37,6 +37,7 @@ public sealed class Consumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
private readonly IStateChanged<ConsumerState> _state;
private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
+ private Exception? _faultException;
public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
@@ -86,33 +87,29 @@ public sealed class Consumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
return;
_eventRegister.Register(new ConsumerDisposed(_correlationId));
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ await DisposeChannel().ConfigureAwait(false);
}
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
+ private async ValueTask DisposeChannel()
{
- ThrowIfDisposed();
-
- return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+ await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
- private async ValueTask<IMessage<TMessage>>
ReceiveMessage(CancellationToken cancellationToken)
- => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
+ => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
- => await Acknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
+ => await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
- => await Acknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
+ => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
var command = new CommandRedeliverUnacknowledgedMessages();
command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
- await _executor.Execute(() => RedeliverUnacknowledgedMessages(command,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
@@ -120,76 +117,35 @@ public sealed class Consumer<TMessage> :
IContainsChannel, IConsumer<TMessage>
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
var unsubscribe = new CommandUnsubscribe();
- await _executor.Execute(() => Unsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => InternalUnsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask Unsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
GetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
- private async Task Seek(CommandSeek command, CancellationToken
cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask Acknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var commandAck = _commandAckPool.Get();
- commandAck.Type = ackType;
- if (commandAck.MessageIds.Count == 0)
- commandAck.MessageIds.Add(messageId.ToMessageIdData());
- else
- commandAck.MessageIds[0].MapFrom(messageId);
-
- try
- {
- await _executor.Execute(() => Acknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _commandAckPool.Return(commandAck);
- }
+ return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- private async ValueTask Acknowledge(CommandAck command, CancellationToken
cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask
RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command,
CancellationToken cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
- private void ThrowIfDisposed()
+ private void Guard()
{
if (_isDisposed != 0)
throw new ConsumerDisposedException(GetType().FullName!);
+
+ if (_faultException is not null)
+ throw new ConsumerFaultedException(_faultException);
}
public async Task EstablishNewChannel(CancellationToken cancellationToken)
@@ -204,7 +160,66 @@ public sealed class Consumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
}
public async ValueTask CloseChannel(CancellationToken cancellationToken)
+ => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask ChannelFaulted(Exception exception)
+ {
+ _faultException = exception;
+ await DisposeChannel().ConfigureAwait(false);
+ }
+
+ private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
+ {
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
+ {
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ {
+ Guard();
+ return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ {
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
+ {
+ Guard();
+ return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask InternalUnsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
{
- await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ }
+
+ private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
+ {
+ var commandAck = _commandAckPool.Get();
+ commandAck.Type = ackType;
+ if (commandAck.MessageIds.Count == 0)
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ else
+ commandAck.MessageIds[0].MapFrom(messageId);
+
+ try
+ {
+ await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ _commandAckPool.Return(commandAck);
+ }
}
}
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs
b/src/DotPulsar/Internal/ConsumerProcess.cs
index 9d06d41..1246a93 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -50,7 +50,9 @@ public sealed class ConsumerProcess : Process
if (ExecutorState == ExecutorState.Faulted)
{
- _stateManager.SetState(ConsumerState.Faulted);
+ var formerState = _stateManager.SetState(ConsumerState.Faulted);
+ if (formerState != ConsumerState.Faulted)
+ Task.Run(() => _consumer.ChannelFaulted(Exception!));
return;
}
diff --git a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
index ea56b6c..337c8ab 100644
--- a/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
+++ b/src/DotPulsar/Internal/Events/ExecutorFaulted.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -19,8 +19,12 @@ using System;
public sealed class ExecutorFaulted : IEvent
{
- public ExecutorFaulted(Guid correlationId)
- => CorrelationId = correlationId;
+ public ExecutorFaulted(Guid correlationId, Exception exception)
+ {
+ CorrelationId = correlationId;
+ Exception = exception;
+ }
public Guid CorrelationId { get; }
+ public Exception Exception { get; }
}
diff --git a/src/DotPulsar/Internal/Executor.cs
b/src/DotPulsar/Internal/Executor.cs
index 27f9faf..f307ff0 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -188,7 +188,7 @@ public sealed class Executor : IExecute
await _exceptionHandler.OnException(context).ConfigureAwait(false);
if (context.Result != FaultAction.Retry)
- _eventRegister.Register(new ExecutorFaulted(_correlationId));
+ _eventRegister.Register(new ExecutorFaulted(_correlationId,
exception));
return context.Result == FaultAction.ThrowException
? throw context.Exception
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 38fb54b..c9d608a 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -23,7 +23,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
-using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
@@ -46,7 +45,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>,
IRegisterEvent
private readonly IExecute _executor;
private int _isDisposed;
private int _producerCount;
- private Exception? _throw;
+ private Exception? _faultException;
public Uri ServiceUrl { get; }
public string Topic { get; }
@@ -105,7 +104,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
if (_cts.IsCancellationRequested)
return;
- _throw = exception;
+ _faultException = exception;
_state.SetState(ProducerState.Faulted);
}
}
@@ -231,8 +230,8 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
if (_producerCount == 0)
{
_ = await _state.StateChangedFrom(ProducerState.Disconnected,
cancellationToken).ConfigureAwait(false);
- if (_throw is not null)
- ExceptionDispatchInfo.Capture(_throw).Throw(); //Retain
original stack trace by throwing like this
+ if (_faultException is not null)
+ throw new ProducerFaultedException(_faultException);
}
if (_producerCount == 1)
@@ -258,9 +257,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
}
public async ValueTask Enqueue(MessageMetadata metadata, TMessage message,
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken
cancellationToken = default)
- {
- await InternalSend(metadata, message, false, onMessageSent,
cancellationToken).ConfigureAwait(false);
- }
+ => await InternalSend(metadata, message, false, onMessageSent,
cancellationToken).ConfigureAwait(false);
private async ValueTask InternalSend(MessageMetadata metadata, TMessage
message, bool sendOpCancelable, Func<MessageId, ValueTask>? onMessageSent =
default, CancellationToken cancellationToken = default)
{
@@ -303,6 +300,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
}
CompleteActivity(task.Result, data.Length, activity);
+
try
{
if (onMessageSent is not null)
@@ -320,18 +318,18 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
if (autoAssignSequenceId)
metadata.SequenceId = 0;
+
throw;
}
}
- public async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
- {
- await Task.WhenAll(_producers.Values.Select(producer =>
producer.WaitForSendQueueEmpty(cancellationToken).AsTask())).ConfigureAwait(false);
- }
+ internal async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
+ => await Task.WhenAll(_producers.Values.Select(producer =>
producer.WaitForSendQueueEmpty(cancellationToken).AsTask())).ConfigureAwait(false);
private void CompleteActivity(MessageId messageId, long payloadSize,
Activity? activity)
{
- if (activity is null) return;
+ if (activity is null)
+ return;
if (activity.IsAllDataRequested)
{
@@ -345,7 +343,8 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
private void FailActivity(Exception exception, Activity? activity)
{
- if (activity is null) return;
+ if (activity is null)
+ return;
if (activity.IsAllDataRequested)
activity.AddException(exception);
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 1778791..868e8b4 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -32,6 +32,7 @@ public sealed class Reader<TMessage> : IContainsChannel,
IReader<TMessage>
private readonly IStateChanged<ReaderState> _state;
private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
+ private Exception? _faultException;
public Uri ServiceUrl { get; }
public string Topic { get; }
@@ -73,39 +74,35 @@ public sealed class Reader<TMessage> : IContainsChannel,
IReader<TMessage>
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
GetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ {
+ Guard();
+ return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ }
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- {
- ThrowIfDisposed();
+ => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
- return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+ private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
+ {
+ Guard();
+ return await _channel.Receive(cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<IMessage<TMessage>>
ReceiveMessage(CancellationToken cancellationToken)
- => await _channel.Receive(cancellationToken).ConfigureAwait(false);
-
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
@@ -114,12 +111,20 @@ public sealed class Reader<TMessage> : IContainsChannel,
IReader<TMessage>
return;
_eventRegister.Register(new ReaderDisposed(_correlationId));
+ await DisposeChannel().ConfigureAwait(false);
+ }
+
+ private async ValueTask DisposeChannel()
+ {
await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
- private async Task Seek(CommandSeek command, CancellationToken
cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ {
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ }
public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
@@ -133,13 +138,20 @@ public sealed class Reader<TMessage> : IContainsChannel,
IReader<TMessage>
}
public async ValueTask CloseChannel(CancellationToken cancellationToken)
- {
- await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
- }
+ => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
- private void ThrowIfDisposed()
+ private void Guard()
{
if (_isDisposed != 0)
throw new ReaderDisposedException(GetType().FullName!);
+
+ if (_faultException is not null)
+ throw new ReaderFaultedException(_faultException);
+ }
+
+ public async ValueTask ChannelFaulted(Exception exception)
+ {
+ _faultException = exception;
+ await DisposeChannel().ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs
b/src/DotPulsar/Internal/ReaderProcess.cs
index 8452c64..1076d84 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -48,6 +48,9 @@ public sealed class ReaderProcess : Process
if (ExecutorState == ExecutorState.Faulted)
{
_stateManager.SetState(ReaderState.Faulted);
+ var formerState = _stateManager.SetState(ReaderState.Faulted);
+ if (formerState != ReaderState.Faulted)
+ Task.Run(() => _reader.ChannelFaulted(Exception!));
return;
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index 1f9588d..8d26335 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -87,20 +87,17 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
{
// Ignored
}
+
await _sendQueue.DisposeAsync().ConfigureAwait(false);
await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
public async ValueTask Send(SendOp sendOp, CancellationToken
cancellationToken)
- {
- await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
- }
+ => await _sendQueue.Enqueue(sendOp,
cancellationToken).ConfigureAwait(false);
- public async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
- {
- await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
- }
+ internal async ValueTask WaitForSendQueueEmpty(CancellationToken
cancellationToken)
+ => await
_sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
private async Task MessageDispatcher(IProducerChannel channel,
CancellationToken cancellationToken)
{
@@ -118,13 +115,14 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
}
var tcs = new TaskCompletionSource<BaseCommand>();
- _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
- TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
+ _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
TaskContinuationOptions.NotOnCanceled |
TaskContinuationOptions.ExecuteSynchronously);
// Use CancellationToken.None here because otherwise it will throw
exceptions on all fault actions even retry.
var success = await _executor.TryExecuteOnce(() =>
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken),
CancellationToken.None).ConfigureAwait(false);
- if (success) continue;
+ if (success)
+ continue;
+
_eventRegister.Register(new ChannelDisconnected(_correlationId));
break;
}
@@ -140,12 +138,16 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
var success = await _executor.TryExecuteOnce(() =>
{
- if (responseTask.IsFaulted) throw responseTask.Exception!;
+ if (responseTask.IsFaulted)
+ throw responseTask.Exception!;
+
responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
ProcessReceipt(responseTask.Result.SendReceipt);
}, CancellationToken.None).ConfigureAwait(false); // Use
CancellationToken.None here because otherwise it will throw exceptions on all
fault actions even retry.
- if (success) continue;
+ if (success)
+ continue;
+
_eventRegister.Register(new
SendReceiptWrongOrdering(_correlationId));
break;
}
@@ -205,7 +207,10 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
}
public async ValueTask CloseChannel(CancellationToken cancellationToken)
+ => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+
+ public ValueTask ChannelFaulted(Exception exception)
{
- await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+ return new ValueTask();
}
}