This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa8d50 Created IStateHolder and have consumers, producers, and
readers implement that instead of IState
7aa8d50 is described below
commit 7aa8d502af7114d21528d6aee40f60342b26baa4
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Dec 11 11:44:51 2024 +0100
Created IStateHolder and have consumers, producers, and readers implement
that instead of IState
---
CHANGELOG.md | 6 ++
src/DotPulsar/Abstractions/IConsumer.cs | 2 +-
src/DotPulsar/Abstractions/IProducer.cs | 2 +-
src/DotPulsar/Abstractions/IReader.cs | 2 +-
.../Abstractions/{IProducer.cs => IStateHolder.cs} | 13 +--
src/DotPulsar/Extensions/ConsumerExtensions.cs | 8 +-
src/DotPulsar/Extensions/ProducerExtensions.cs | 8 +-
src/DotPulsar/Extensions/ReaderExtensions.cs | 8 +-
src/DotPulsar/Extensions/StateExtensions.cs | 108 ---------------------
...StateExtensions.cs => StateHolderExtensions.cs} | 88 ++++-------------
src/DotPulsar/Internal/Abstractions/IConnection.cs | 2 +-
.../Internal/Abstractions/IStateChanged.cs | 59 -----------
.../Internal/Abstractions/IStateManager.cs | 6 +-
src/DotPulsar/Internal/ChannelManager.cs | 14 +--
src/DotPulsar/Internal/Connection.cs | 18 +---
src/DotPulsar/Internal/ConnectionPool.cs | 2 +-
src/DotPulsar/Internal/Consumer.cs | 17 +---
src/DotPulsar/Internal/MonitorState.cs | 6 +-
src/DotPulsar/Internal/PingPongHandler.cs | 14 +--
src/DotPulsar/Internal/Producer.cs | 20 +---
src/DotPulsar/Internal/Reader.cs | 17 +---
src/DotPulsar/Internal/StateManager.cs | 4 +-
src/DotPulsar/Internal/SubConsumer.cs | 17 +---
src/DotPulsar/Internal/SubProducer.cs | 19 +---
src/DotPulsar/Internal/SubReader.cs | 17 +---
.../Extensions/StateExtensionsTests.cs | 44 ---------
.../Extensions/StateHolderExtensionsTests.cs | 68 +++++++++++++
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 8 +-
.../Internal/PingPongHandlerTest.cs | 8 +-
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 18 ++--
tests/DotPulsar.Tests/Internal/ReaderTests.cs | 8 +-
.../DotPulsar.Tests/Internal/StateManagerTests.cs | 36 +++----
tests/DotPulsar.Tests/PulsarClientTests.cs | 8 +-
33 files changed, 198 insertions(+), 477 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80d6122..787d041 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Changed
+
+- **Breaking**: The consumer, reader, and producer now implements IStateHolder
instead of IState
+
## [3.6.0] - 2024-12-09
### Added
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs
b/src/DotPulsar/Abstractions/IConsumer.cs
index 5f90a38..84ea708 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A consumer abstraction.
/// </summary>
-public interface IConsumer : IGetLastMessageIds, ISeek, IState<ConsumerState>,
IAsyncDisposable
+public interface IConsumer : IGetLastMessageIds, ISeek,
IStateHolder<ConsumerState>, IAsyncDisposable
{
/// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs
b/src/DotPulsar/Abstractions/IProducer.cs
index 7f8b658..0f5d10f 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A producer abstraction.
/// </summary>
-public interface IProducer : IState<ProducerState>, IAsyncDisposable
+public interface IProducer : IStateHolder<ProducerState>, IAsyncDisposable
{
/// <summary>
/// The producer's service url.
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/Abstractions/IReader.cs
index 812a4f0..931b014 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
/// <summary>
/// A reader abstraction.
/// </summary>
-public interface IReader : IGetLastMessageIds, ISeek, IState<ReaderState>,
IAsyncDisposable
+public interface IReader : IGetLastMessageIds, ISeek,
IStateHolder<ReaderState>, IAsyncDisposable
{
/// <summary>
/// The reader's service url.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs
b/src/DotPulsar/Abstractions/IStateHolder.cs
similarity index 71%
copy from src/DotPulsar/Abstractions/IProducer.cs
copy to src/DotPulsar/Abstractions/IStateHolder.cs
index 7f8b658..34b5e37 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IStateHolder.cs
@@ -15,17 +15,12 @@
namespace DotPulsar.Abstractions;
/// <summary>
-/// A producer abstraction.
+/// A state holder abstraction.
/// </summary>
-public interface IProducer : IState<ProducerState>, IAsyncDisposable
+public interface IStateHolder<TState> where TState : notnull
{
/// <summary>
- /// The producer's service url.
+ /// The state abstraction of the holder.
/// </summary>
- public Uri ServiceUrl { get; }
-
- /// <summary>
- /// The producer's topic.
- /// </summary>
- string Topic { get; }
+ IState<TState> State { get; }
}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 1d4e200..ee00db2 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -67,7 +67,7 @@ public static class ConsumerExtensions
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedTo(this
IConsumer consumer, ConsumerState state, CancellationToken cancellationToken =
default)
{
- var currentState = await consumer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await consumer.State.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}
@@ -82,7 +82,7 @@ public static class ConsumerExtensions
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedTo(this
IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await consumer.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await consumer.State.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}
@@ -97,7 +97,7 @@ public static class ConsumerExtensions
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this
IConsumer consumer, ConsumerState state, CancellationToken cancellationToken =
default)
{
- var currentState = await consumer.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await consumer.State.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}
@@ -112,7 +112,7 @@ public static class ConsumerExtensions
/// </remarks>
public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this
IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await consumer.OnStateChangeFrom(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await consumer.State.OnStateChangeFrom(state,
delay, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}
}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs
b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 885f948..f31e36e 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -39,7 +39,7 @@ public static class ProducerExtensions
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedTo(this
IProducer producer, ProducerState state, CancellationToken cancellationToken =
default)
{
- var currentState = await producer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await producer.State.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}
@@ -54,7 +54,7 @@ public static class ProducerExtensions
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedTo(this
IProducer producer, ProducerState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await producer.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await producer.State.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}
@@ -69,7 +69,7 @@ public static class ProducerExtensions
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedFrom(this
IProducer producer, ProducerState state, CancellationToken cancellationToken =
default)
{
- var currentState = await producer.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await producer.State.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}
@@ -84,7 +84,7 @@ public static class ProducerExtensions
/// </remarks>
public static async ValueTask<ProducerStateChanged> StateChangedFrom(this
IProducer producer, ProducerState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await producer.OnStateChangeFrom(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await producer.State.OnStateChangeFrom(state,
delay, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, currentState);
}
}
diff --git a/src/DotPulsar/Extensions/ReaderExtensions.cs
b/src/DotPulsar/Extensions/ReaderExtensions.cs
index e8c109a..ee8041a 100644
--- a/src/DotPulsar/Extensions/ReaderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderExtensions.cs
@@ -32,7 +32,7 @@ public static class ReaderExtensions
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedTo(this
IReader reader, ReaderState state, CancellationToken cancellationToken =
default)
{
- var currentState = await reader.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await reader.State.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}
@@ -47,7 +47,7 @@ public static class ReaderExtensions
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedTo(this
IReader reader, ReaderState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await reader.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await reader.State.OnStateChangeTo(state, delay,
cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}
@@ -62,7 +62,7 @@ public static class ReaderExtensions
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedFrom(this
IReader reader, ReaderState state, CancellationToken cancellationToken =
default)
{
- var currentState = await reader.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ var currentState = await reader.State.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}
@@ -77,7 +77,7 @@ public static class ReaderExtensions
/// </remarks>
public static async ValueTask<ReaderStateChanged> StateChangedFrom(this
IReader reader, ReaderState state, TimeSpan delay, CancellationToken
cancellationToken = default)
{
- var currentState = await reader.OnStateChangeFrom(state, delay,
cancellationToken).ConfigureAwait(false);
+ var currentState = await reader.State.OnStateChangeFrom(state, delay,
cancellationToken).ConfigureAwait(false);
return new ReaderStateChanged(reader, currentState);
}
}
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs
b/src/DotPulsar/Extensions/StateExtensions.cs
index 0e07bb2..a04e6b3 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateExtensions.cs
@@ -100,112 +100,4 @@ public static class StateExtensions
}
}
}
-
- /// <summary>
- /// Will invoke the onStateLeft callback when the state if left (with
delay) and onStateReached when it's reached again.
- /// </summary>
- /// <returns>
- /// ValueTask that will run as long as a final state is not entered.
- /// </returns>
- public static async ValueTask DelayedStateMonitor<TEntity, TState,
TFaultContext>(
- this TEntity stateImplementer,
- TState state,
- TimeSpan delay,
- Func<TEntity, TState, CancellationToken, ValueTask<TFaultContext>>
onStateLeft,
- Func<TEntity, TState, TFaultContext, CancellationToken, ValueTask>
onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull where TFaultContext : class
- {
- while (true)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- var currentState = await stateImplementer.OnStateChangeFrom(state,
delay, cancellationToken).ConfigureAwait(false);
-
- TFaultContext? faultContext = null;
-
- try
- {
- faultContext = await onStateLeft(stateImplementer,
currentState, cancellationToken).ConfigureAwait(false);
- }
- catch
- {
- // Ignore
- }
-
- if (stateImplementer.IsFinalState(currentState))
- return;
-
- currentState = await stateImplementer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
-
- if (stateImplementer.IsFinalState(currentState))
- return;
-
- try
- {
- if (faultContext is not null)
- await onStateReached(stateImplementer, currentState,
faultContext, cancellationToken).ConfigureAwait(false);
- }
- catch
- {
- // Ignore
- }
- }
- }
-
- /// <summary>
- /// Will invoke the onStateLeft callback when the state if left (with
delay) and onStateReached when it's reached again.
- /// </summary>
- /// <returns>
- /// ValueTask that will run as long as a final state is not entered.
- /// </returns>
- public static async ValueTask DelayedStateMonitor<TEntity, TState>(
- this TEntity stateImplementer,
- TState state,
- TimeSpan delay,
- Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
- Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull
- {
- async ValueTask<string> onStateLeftFunction(TEntity entity, TState
state, CancellationToken cancellationToken)
- {
- await onStateLeft(entity, state,
cancellationToken).ConfigureAwait(false);
- return string.Empty;
- }
-
- async ValueTask onStateReachedFunction(TEntity entity, TState state,
string faultContext, CancellationToken cancellationToken)
- {
- await onStateReached(entity, state,
cancellationToken).ConfigureAwait(false);
- }
-
- await stateImplementer.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
- }
-
- /// <summary>
- /// Will invoke the onStateLeft callback when the state if left (with
delay) and onStateReached when it's reached again.
- /// </summary>
- /// <returns>
- /// ValueTask that will run as long as a final state is not entered.
- /// </returns>
- public static async ValueTask DelayedStateMonitor<TEntity, TState>(
- this TEntity stateImplementer,
- TState state,
- TimeSpan delay,
- Action<TEntity, TState> onStateLeft,
- Action<TEntity, TState> onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull
- {
- ValueTask<string> onStateLeftFunction(TEntity entity, TState state,
CancellationToken cancellationToken)
- {
- onStateLeft(entity, state);
- return new ValueTask<string>(string.Empty);
- }
-
- ValueTask onStateReachedFunction(TEntity entity, TState state, string
faultContext, CancellationToken cancellationToken)
- {
- onStateReached(entity, state);
- return new ValueTask();
- }
-
- await stateImplementer.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
- }
}
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs
b/src/DotPulsar/Extensions/StateHolderExtensions.cs
similarity index 61%
copy from src/DotPulsar/Extensions/StateExtensions.cs
copy to src/DotPulsar/Extensions/StateHolderExtensions.cs
index 0e07bb2..0ca2631 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateHolderExtensions.cs
@@ -17,9 +17,9 @@ namespace DotPulsar.Extensions;
using DotPulsar.Abstractions;
/// <summary>
-/// Extensions for IState.
+/// Extensions for IStateHolder.
/// </summary>
-public static class StateExtensions
+public static class StateHolderExtensions
{
/// <summary>
/// Wait for the state to change to a specific state with a delay.
@@ -31,35 +31,11 @@ public static class StateExtensions
/// If the state change to a final state, then all awaiting tasks will
complete.
/// </remarks>
public static async ValueTask<TState> OnStateChangeTo<TState>(
- this IState<TState> stateChanged,
+ this IStateHolder<TState> stateHolder,
TState state,
TimeSpan delay,
CancellationToken cancellationToken = default) where TState : notnull
- {
- while (true)
- {
- var currentState = await stateChanged.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
- if (stateChanged.IsFinalState(currentState))
- return currentState;
-
- using var cts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- cts.CancelAfter(delay);
-
- try
- {
- currentState = await stateChanged.OnStateChangeFrom(state,
cts.Token).ConfigureAwait(false);
- if (stateChanged.IsFinalState(currentState))
- return currentState;
- }
- catch (OperationCanceledException)
- {
- if (cancellationToken.IsCancellationRequested)
- throw;
-
- return state;
- }
- }
- }
+ => await stateHolder.State.OnStateChangeTo(state, delay,
cancellationToken);
/// <summary>
/// Wait for the state to change from a specific state with a delay.
@@ -71,35 +47,11 @@ public static class StateExtensions
/// If the state change to a final state, then all awaiting tasks will
complete.
/// </remarks>
public static async ValueTask<TState> OnStateChangeFrom<TState>(
- this IState<TState> stateChanged,
+ this IStateHolder<TState> stateHolder,
TState state,
TimeSpan delay,
CancellationToken cancellationToken = default) where TState : notnull
- {
- while (true)
- {
- var currentState = await stateChanged.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
- if (stateChanged.IsFinalState(currentState))
- return currentState;
-
- using var cts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- cts.CancelAfter(delay);
-
- try
- {
- currentState = await stateChanged.OnStateChangeTo(state,
cts.Token).ConfigureAwait(false);
- if (stateChanged.IsFinalState(currentState))
- return currentState;
- }
- catch (OperationCanceledException)
- {
- if (cancellationToken.IsCancellationRequested)
- throw;
-
- return currentState;
- }
- }
- }
+ => await stateHolder.State.OnStateChangeFrom(state, delay,
cancellationToken);
/// <summary>
/// Will invoke the onStateLeft callback when the state if left (with
delay) and onStateReached when it's reached again.
@@ -108,42 +60,42 @@ public static class StateExtensions
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState,
TFaultContext>(
- this TEntity stateImplementer,
+ this TEntity stateHolder,
TState state,
TimeSpan delay,
Func<TEntity, TState, CancellationToken, ValueTask<TFaultContext>>
onStateLeft,
Func<TEntity, TState, TFaultContext, CancellationToken, ValueTask>
onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull where TFaultContext : class
+ CancellationToken cancellationToken = default) where TEntity :
IStateHolder<TState> where TState : notnull where TFaultContext : class
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
- var currentState = await stateImplementer.OnStateChangeFrom(state,
delay, cancellationToken).ConfigureAwait(false);
+ var currentState = await stateHolder.OnStateChangeFrom(state,
delay, cancellationToken).ConfigureAwait(false);
TFaultContext? faultContext = null;
try
{
- faultContext = await onStateLeft(stateImplementer,
currentState, cancellationToken).ConfigureAwait(false);
+ faultContext = await onStateLeft(stateHolder, currentState,
cancellationToken).ConfigureAwait(false);
}
catch
{
// Ignore
}
- if (stateImplementer.IsFinalState(currentState))
+ if (stateHolder.State.IsFinalState(currentState))
return;
- currentState = await stateImplementer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ currentState = await stateHolder.State.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
- if (stateImplementer.IsFinalState(currentState))
+ if (stateHolder.State.IsFinalState(currentState))
return;
try
{
if (faultContext is not null)
- await onStateReached(stateImplementer, currentState,
faultContext, cancellationToken).ConfigureAwait(false);
+ await onStateReached(stateHolder, currentState,
faultContext, cancellationToken).ConfigureAwait(false);
}
catch
{
@@ -159,12 +111,12 @@ public static class StateExtensions
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState>(
- this TEntity stateImplementer,
+ this TEntity stateHolder,
TState state,
TimeSpan delay,
Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull
+ CancellationToken cancellationToken = default) where TEntity :
IStateHolder<TState> where TState : notnull
{
async ValueTask<string> onStateLeftFunction(TEntity entity, TState
state, CancellationToken cancellationToken)
{
@@ -177,7 +129,7 @@ public static class StateExtensions
await onStateReached(entity, state,
cancellationToken).ConfigureAwait(false);
}
- await stateImplementer.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
+ await stateHolder.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
}
/// <summary>
@@ -187,12 +139,12 @@ public static class StateExtensions
/// ValueTask that will run as long as a final state is not entered.
/// </returns>
public static async ValueTask DelayedStateMonitor<TEntity, TState>(
- this TEntity stateImplementer,
+ this TEntity stateHolder,
TState state,
TimeSpan delay,
Action<TEntity, TState> onStateLeft,
Action<TEntity, TState> onStateReached,
- CancellationToken cancellationToken = default) where TEntity :
IState<TState> where TState : notnull
+ CancellationToken cancellationToken = default) where TEntity :
IStateHolder<TState> where TState : notnull
{
ValueTask<string> onStateLeftFunction(TEntity entity, TState state,
CancellationToken cancellationToken)
{
@@ -206,6 +158,6 @@ public static class StateExtensions
return new ValueTask();
}
- await stateImplementer.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
+ await stateHolder.DelayedStateMonitor(state, delay,
onStateLeftFunction, onStateReachedFunction,
cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 48cad09..2f2a5b8 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Internal.Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
-public interface IConnection : IState<ConnectionState>, IAsyncDisposable
+public interface IConnection : IStateHolder<ConnectionState>, IAsyncDisposable
{
public int MaxMessageSize { get; }
diff --git a/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
deleted file mode 100644
index 31f579f..0000000
--- a/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions;
-
-/// <summary>
-/// A state change monitoring abstraction.
-/// </summary>
-public interface IStateChanged<TState> where TState : notnull
-{
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<TState> StateChangedTo(TState state, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<TState> StateChangedFrom(TState state, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Ask whether the current state is final, meaning that it will never
change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will never
change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(TState state);
-}
diff --git a/src/DotPulsar/Internal/Abstractions/IStateManager.cs
b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
index 135a2ac..9d5fe25 100644
--- a/src/DotPulsar/Internal/Abstractions/IStateManager.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,7 +14,9 @@
namespace DotPulsar.Internal.Abstractions;
-public interface IStateManager<TState> : IStateChanged<TState> where TState :
notnull
+using DotPulsar.Abstractions;
+
+public interface IStateManager<TState> : IState<TState> where TState : notnull
{
TState CurrentState { get; }
TState SetState(TState state);
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index 7d4a1e8..dc7e14a 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -20,7 +20,7 @@ using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
using System.Buffers;
-public sealed class ChannelManager : IState<ChannelManagerState>, IDisposable
+public sealed class ChannelManager : IStateHolder<ChannelManagerState>,
IDisposable
{
private readonly StateManager<ChannelManagerState> _stateManager;
private readonly RequestResponseHandler _requestResponseHandler;
@@ -28,6 +28,8 @@ public sealed class ChannelManager :
IState<ChannelManagerState>, IDisposable
private readonly IdLookup<IChannel> _producerChannels;
private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>>
_incoming;
+ public IState<ChannelManagerState> State => _stateManager;
+
public ChannelManager()
{
_stateManager = new
StateManager<ChannelManagerState>(ChannelManagerState.Inactive,
ChannelManagerState.Closed);
@@ -302,14 +304,4 @@ public sealed class ChannelManager :
IState<ChannelManagerState>, IDisposable
if (_consumerChannels.IsEmpty() && _producerChannels.IsEmpty())
_stateManager.SetState(ChannelManagerState.Inactive);
}
-
- public bool IsFinalState() => _stateManager.IsFinalState();
-
- public bool IsFinalState(ChannelManagerState state) =>
_stateManager.IsFinalState(state);
-
- public async ValueTask<ChannelManagerState>
OnStateChangeTo(ChannelManagerState state, CancellationToken cancellationToken
= default)
- => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ChannelManagerState>
OnStateChangeFrom(ChannelManagerState state, CancellationToken
cancellationToken = default)
- => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 1c7771a..03f618e 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -61,6 +61,8 @@ public sealed class Connection : IConnection
public int MaxMessageSize { get; set; }
+ public IState<ConnectionState> State => _stateManager;
+
public async Task<ProducerResponse> Send(CommandProducer command, IChannel
channel, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -309,8 +311,8 @@ public sealed class Connection : IConnection
private async Task Setup(CancellationToken cancellationToken)
{
var incoming = ProcessIncomingFrames(cancellationToken);
- var channelManager =
_channelManager.OnStateChangeTo(ChannelManagerState.Inactive,
_closeOnInactiveInterval, cancellationToken).AsTask();
- var pingPongTimeOut =
_pingPongHandler.OnStateChangeTo(PingPongHandlerState.TimedOut,
cancellationToken).AsTask();
+ var channelManager =
_channelManager.State.OnStateChangeTo(ChannelManagerState.Inactive,
_closeOnInactiveInterval, cancellationToken).AsTask();
+ var pingPongTimeOut =
_pingPongHandler.State.OnStateChangeTo(PingPongHandlerState.TimedOut,
cancellationToken).AsTask();
_ = Task.Factory.StartNew(async () => await
KeepAlive(PingPongHandlerState.Active,
cancellationToken).ConfigureAwait(false));
await Task.WhenAny(incoming, channelManager,
pingPongTimeOut).ConfigureAwait(false);
_stateManager.SetState(ConnectionState.Disconnected);
@@ -320,7 +322,7 @@ public sealed class Connection : IConnection
{
while (!cancellationToken.IsCancellationRequested)
{
- state = await _pingPongHandler.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ state = await _pingPongHandler.State.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
if (state == PingPongHandlerState.TimedOut)
return;
if (state == PingPongHandlerState.Active)
@@ -381,14 +383,4 @@ public sealed class Connection : IConnection
if (_isDisposed != 0)
throw new ConnectionDisposedException();
}
-
- public bool IsFinalState() => _stateManager.IsFinalState();
-
- public bool IsFinalState(ConnectionState state) =>
_stateManager.IsFinalState(state);
-
- public async ValueTask<ConnectionState> OnStateChangeTo(ConnectionState
state, CancellationToken cancellationToken = default)
- => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ConnectionState> OnStateChangeFrom(ConnectionState
state, CancellationToken cancellationToken = default)
- => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index 8d794c4..0e0e540 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -152,7 +152,7 @@ public sealed class ConnectionPool : IConnectionPool
commandConnect = WithProxyToBroker(commandConnect, url.Logical);
var connection = Connection.Connect(new PulsarStream(stream),
_authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
- _ = connection.OnStateChangeFrom(ConnectionState.Connected,
CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url,
connection), CancellationToken.None);
+ _ = connection.State.OnStateChangeFrom(ConnectionState.Connected,
CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url,
connection), CancellationToken.None);
var response = await connection.Send(commandConnect,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Connected);
_connections[url] = connection;
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 0e85429..dc45883 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -45,6 +45,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
public string SubscriptionName { get; }
public SubscriptionType SubscriptionType { get; }
public string Topic { get; }
+ public IState<ConsumerState> State => _state;
public Consumer(
Uri serviceUrl,
@@ -111,7 +112,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_receiveTasks[i] = _emptyTaskCompletionSource.Task;
var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
_subConsumers[i] = CreateSubConsumer(topicName);
- monitoringTasks[i] =
_subConsumers[i].OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
+ monitoringTasks[i] =
_subConsumers[i].State.OnStateChangeFrom(ConsumerState.Disconnected,
_cts.Token).AsTask();
}
_allSubConsumersAreReady = true;
@@ -129,7 +130,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
var state = task.Result;
states[i] = state;
- monitoringTasks[i] = _subConsumers[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
+ monitoringTasks[i] =
_subConsumers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (!_isPartitionedTopic)
@@ -151,18 +152,6 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
}
}
- public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ConsumerState state)
- => _state.IsFinalState(state);
-
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/MonitorState.cs
b/src/DotPulsar/Internal/MonitorState.cs
index 6cb264b..e20ab3c 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -25,7 +25,7 @@ public static class StateMonitor
var state = ProducerState.Disconnected;
- while (!producer.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
+ while (!producer.State.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await producer.StateChangedFrom(state,
handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ProducerState;
@@ -47,7 +47,7 @@ public static class StateMonitor
var state = ConsumerState.Disconnected;
- while (!consumer.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
+ while (!consumer.State.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await consumer.StateChangedFrom(state,
handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ConsumerState;
@@ -69,7 +69,7 @@ public static class StateMonitor
var state = ReaderState.Disconnected;
- while (!reader.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
+ while (!reader.State.IsFinalState(state) &&
!handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await reader.StateChangedFrom(state,
handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ReaderState;
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index 4533552..1a036e6 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -18,7 +18,7 @@ using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System.Diagnostics;
-public sealed class PingPongHandler : IState<PingPongHandlerState>,
IAsyncDisposable
+public sealed class PingPongHandler : IStateHolder<PingPongHandlerState>,
IAsyncDisposable
{
private readonly CancellationTokenSource _cts;
private readonly StateManager<PingPongHandlerState> _stateManager;
@@ -26,6 +26,8 @@ public sealed class PingPongHandler :
IState<PingPongHandlerState>, IAsyncDispos
private long _lastCommand;
private bool _waitForPong;
+ public IState<PingPongHandlerState> State => _stateManager;
+
public PingPongHandler(TimeSpan keepAliveInterval)
{
_cts = new CancellationTokenSource();
@@ -85,14 +87,4 @@ public sealed class PingPongHandler :
IState<PingPongHandlerState>, IAsyncDispos
_stateManager.SetState(PingPongHandlerState.Closed);
return new ValueTask();
}
-
- public bool IsFinalState() => _stateManager.IsFinalState();
-
- public bool IsFinalState(PingPongHandlerState state) =>
_stateManager.IsFinalState(state);
-
- public async ValueTask<PingPongHandlerState>
OnStateChangeTo(PingPongHandlerState state, CancellationToken cancellationToken
= default)
- => await _stateManager.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<PingPongHandlerState>
OnStateChangeFrom(PingPongHandlerState state, CancellationToken
cancellationToken = default)
- => await _stateManager.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index e9e2ad9..0f7534d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -44,8 +44,8 @@ public sealed class Producer<TMessage> : IProducer<TMessage>,
IRegisterEvent
public Uri ServiceUrl { get; }
public string Topic { get; }
-
public ISendChannel<TMessage> SendChannel { get; }
+ public IState<ProducerState> State => _state;
public Producer(
Uri serviceUrl,
@@ -116,7 +116,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
var producer = CreateSubProducer(topicName, isPartitionedTopic ? i
: -1);
_ = _producers.TryAdd(i, producer);
- monitoringTasks[i] =
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+ monitoringTasks[i] =
producer.State.OnStateChangeFrom(ProducerState.Disconnected,
_cts.Token).AsTask();
}
Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
@@ -133,7 +133,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var state = task.Result;
states[i] = state;
- monitoringTasks[i] = _producers[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
+ monitoringTasks[i] =
_producers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (!isPartitionedTopic)
@@ -171,18 +171,6 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
return producer;
}
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ProducerState state)
- => _state.IsFinalState(state);
-
- public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
@@ -203,7 +191,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
{
if (_producerCount == 0)
{
- var newState = await
_state.StateChangedFrom(ProducerState.Disconnected,
cancellationToken).ConfigureAwait(false);
+ var newState = await
_state.OnStateChangeFrom(ProducerState.Disconnected,
cancellationToken).ConfigureAwait(false);
if (_faultException is not null)
throw new ProducerFaultedException(_faultException);
if (newState == ProducerState.Closed)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 910e0c0..93f07f6 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -43,6 +43,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
public Uri ServiceUrl { get; }
public string Topic { get; }
+ public IState<ReaderState> State => _state;
public Reader(
Uri serviceUrl,
@@ -104,7 +105,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
_receiveTasks[i] = _emptyTaskCompletionSource.Task;
var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) :
Topic;
_subReaders[i] = CreateSubReader(topicName);
- monitoringTasks[i] =
_subReaders[i].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
+ monitoringTasks[i] =
_subReaders[i].State.OnStateChangeFrom(ReaderState.Disconnected,
_cts.Token).AsTask();
}
_allSubReadersAreReady = true;
@@ -122,7 +123,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
var state = task.Result;
states[i] = state;
- monitoringTasks[i] = _subReaders[i].OnStateChangeFrom(state,
_cts.Token).AsTask();
+ monitoringTasks[i] =
_subReaders[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
}
if (!_isPartitionedTopic)
@@ -140,18 +141,6 @@ public sealed class Reader<TMessage> : IReader<TMessage>
}
}
- public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state,
CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ReaderState state)
- => _state.IsFinalState(state);
-
public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/StateManager.cs
b/src/DotPulsar/Internal/StateManager.cs
index 7e980f4..7a42e65 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -51,7 +51,7 @@ public sealed class StateManager<TState> :
IStateManager<TState> where TState :
}
}
- public ValueTask<TState> StateChangedTo(TState state, CancellationToken
cancellationToken)
+ public ValueTask<TState> OnStateChangeTo(TState state, CancellationToken
cancellationToken)
{
lock (_lock)
{
@@ -61,7 +61,7 @@ public sealed class StateManager<TState> :
IStateManager<TState> where TState :
}
}
- public ValueTask<TState> StateChangedFrom(TState state, CancellationToken
cancellationToken)
+ public ValueTask<TState> OnStateChangeFrom(TState state, CancellationToken
cancellationToken)
{
lock (_lock)
{
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
index a6001fe..6be040d 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -29,7 +29,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
private IConsumerChannel<TMessage> _channel;
private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
- private readonly IStateChanged<ConsumerState> _state;
+ private readonly IState<ConsumerState> _state;
private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
private Exception? _faultException;
@@ -38,6 +38,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
public string SubscriptionName { get; }
public SubscriptionType SubscriptionType { get; }
public string Topic { get; }
+ public IState<ConsumerState> State => _state;
public SubConsumer(
Guid correlationId,
@@ -48,7 +49,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
IRegisterEvent eventRegister,
IConsumerChannel<TMessage> initialChannel,
IExecute executor,
- IStateChanged<ConsumerState> state,
+ IState<ConsumerState> state,
IConsumerChannelFactory<TMessage> factory)
{
_correlationId = correlationId;
@@ -67,18 +68,6 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
_eventRegister.Register(new ConsumerCreated(_correlationId));
}
- public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ConsumerState state)
- => _state.IsFinalState(state);
-
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index c778a97..0554988 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -22,7 +22,7 @@ using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
-public sealed class SubProducer : IContainsChannel, IState<ProducerState>
+public sealed class SubProducer : IContainsChannel, IStateHolder<ProducerState>
{
private readonly AsyncQueueWithCursor<SendOp> _sendQueue;
private CancellationTokenSource? _dispatcherCts;
@@ -31,20 +31,21 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
private readonly IRegisterEvent _eventRegister;
private IProducerChannel _channel;
private readonly IExecute _executor;
- private readonly IStateChanged<ProducerState> _state;
+ private readonly IState<ProducerState> _state;
private readonly IProducerChannelFactory _factory;
private readonly int _partition;
private int _isDisposed;
private Exception? _faultException;
public string Topic { get; }
+ public IState<ProducerState> State => _state;
public SubProducer(
Guid correlationId,
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
- IStateChanged<ProducerState> state,
+ IState<ProducerState> state,
IProducerChannelFactory factory,
int partition,
uint maxPendingMessages,
@@ -64,18 +65,6 @@ public sealed class SubProducer : IContainsChannel,
IState<ProducerState>
_eventRegister.Register(new ProducerCreated(_correlationId));
}
- public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ProducerState state)
- => _state.IsFinalState(state);
-
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/SubReader.cs
b/src/DotPulsar/Internal/SubReader.cs
index e456a13..d0d6c72 100644
--- a/src/DotPulsar/Internal/SubReader.cs
+++ b/src/DotPulsar/Internal/SubReader.cs
@@ -26,13 +26,14 @@ public sealed class SubReader<TMessage> : IContainsChannel,
IReader<TMessage>
private readonly IRegisterEvent _eventRegister;
private IConsumerChannel<TMessage> _channel;
private readonly IExecute _executor;
- private readonly IStateChanged<ReaderState> _state;
+ private readonly IState<ReaderState> _state;
private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
private Exception? _faultException;
public Uri ServiceUrl { get; }
public string Topic { get; }
+ public IState<ReaderState> State => _state;
public SubReader(
Guid correlationId,
@@ -41,7 +42,7 @@ public sealed class SubReader<TMessage> : IContainsChannel,
IReader<TMessage>
IRegisterEvent eventRegister,
IConsumerChannel<TMessage> initialChannel,
IExecute executor,
- IStateChanged<ReaderState> state,
+ IState<ReaderState> state,
IConsumerChannelFactory<TMessage> factory)
{
_correlationId = correlationId;
@@ -57,18 +58,6 @@ public sealed class SubReader<TMessage> : IContainsChannel,
IReader<TMessage>
_eventRegister.Register(new ReaderCreated(_correlationId));
}
- public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state,
CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ReaderState state)
- => _state.IsFinalState(state);
-
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
var getLastMessageId = new CommandGetLastMessageId();
diff --git a/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
b/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
index dc76afd..376c455 100644
--- a/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
@@ -182,48 +182,4 @@ public class StateExtensionsTests
// Assert
exception.Should().BeOfType<OperationCanceledException>();
}
-
- [Theory, Tests.AutoData]
- public async Task
DelayedStateMonitor_WhenChangingToFinalStateInitially_ShouldInvokeOnStateLeft([Frozen]
IState<ProducerState> uut)
- {
- // Arrange
- const ProducerState expected = ProducerState.Faulted;
- uut.OnStateChangeFrom(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(expected);
- uut.IsFinalState(expected).Returns(true);
- ProducerState? stateLeft = null;
- ProducerState? stateReached = null;
- void onStateLeft(IState<ProducerState> _, ProducerState state) =>
stateLeft = state;
- void onStateReached(IState<ProducerState> _, ProducerState state) =>
stateReached = state;
-
- // Act
- await uut.DelayedStateMonitor(ProducerState.Connected,
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
-
- // Assert
- stateLeft.HasValue.Should().BeTrue();
- stateLeft!.Value.Should().Be(expected);
- stateReached.HasValue.Should().BeFalse();
- }
-
- [Theory, Tests.AutoData]
- public async Task
DelayedStateMonitor_WhenChangingToFinalStateWhileWaiting_ShouldInvokeOnStateLeft([Frozen]
IState<ProducerState> uut)
- {
- // Arrange
- const ProducerState expected = ProducerState.Disconnected;
- uut.OnStateChangeFrom(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(expected);
- uut.IsFinalState(ProducerState.Disconnected).Returns(false);
- uut.OnStateChangeTo(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(x => throw new
OperationCanceledException(), x => ProducerState.Faulted);
- uut.IsFinalState(ProducerState.Faulted).Returns(true);
- ProducerState? stateLeft = null;
- ProducerState? stateReached = null;
- void onStateLeft(IState<ProducerState> _, ProducerState state) =>
stateLeft = state;
- void onStateReached(IState<ProducerState> _, ProducerState state) =>
stateReached = state;
-
- // Act
- await uut.DelayedStateMonitor(ProducerState.Connected,
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
-
- // Assert
- stateLeft.HasValue.Should().BeTrue();
- stateLeft!.Value.Should().Be(expected);
- stateReached.HasValue.Should().BeFalse();
- }
}
diff --git a/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs
b/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs
new file mode 100644
index 0000000..e36e960
--- /dev/null
+++ b/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Extensions;
+
+using AutoFixture.Xunit2;
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using System;
+
+[Trait("Category", "Unit")]
+public class StateHolderExtensionsTests
+{
+ [Theory, Tests.AutoData]
+ public async Task
DelayedStateMonitor_WhenChangingToFinalStateInitially_ShouldInvokeOnStateLeft([Frozen]
IStateHolder<ProducerState> uut)
+ {
+ // Arrange
+ const ProducerState expected = ProducerState.Faulted;
+ uut.State.OnStateChangeFrom(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(expected);
+ uut.State.IsFinalState(expected).Returns(true);
+ ProducerState? stateLeft = null;
+ ProducerState? stateReached = null;
+ void onStateLeft(IStateHolder<ProducerState> _, ProducerState state)
=> stateLeft = state;
+ void onStateReached(IStateHolder<ProducerState> _, ProducerState
state) => stateReached = state;
+
+ // Act
+ await uut.DelayedStateMonitor(ProducerState.Connected,
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
+
+ // Assert
+ stateLeft.HasValue.Should().BeTrue();
+ stateLeft!.Value.Should().Be(expected);
+ stateReached.HasValue.Should().BeFalse();
+ }
+
+ [Theory, Tests.AutoData]
+ public async Task
DelayedStateMonitor_WhenChangingToFinalStateWhileWaiting_ShouldInvokeOnStateLeft([Frozen]
IStateHolder<ProducerState> uut)
+ {
+ // Arrange
+ const ProducerState expected = ProducerState.Disconnected;
+ uut.State.OnStateChangeFrom(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(expected);
+ uut.State.IsFinalState(ProducerState.Disconnected).Returns(false);
+ uut.State.OnStateChangeTo(ProducerState.Connected,
Arg.Any<CancellationToken>()).Returns(x => throw new
OperationCanceledException(), x => ProducerState.Faulted);
+ uut.State.IsFinalState(ProducerState.Faulted).Returns(true);
+ ProducerState? stateLeft = null;
+ ProducerState? stateReached = null;
+ void onStateLeft(IStateHolder<ProducerState> _, ProducerState state)
=> stateLeft = state;
+ void onStateReached(IStateHolder<ProducerState> _, ProducerState
state) => stateReached = state;
+
+ // Act
+ await uut.DelayedStateMonitor(ProducerState.Connected,
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
+
+ // Assert
+ stateLeft.HasValue.Should().BeTrue();
+ stateLeft!.Value.Should().Be(expected);
+ stateReached.HasValue.Should().BeFalse();
+ }
+}
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 911c147..274e600 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -183,7 +183,7 @@ public sealed class ConsumerTests : IDisposable
await using var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
- await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
+ await consumer.State.OnStateChangeTo(ConsumerState.Faulted,
_cts.Token);
//Act
var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
@@ -257,7 +257,7 @@ public sealed class ConsumerTests : IDisposable
//Arrange
await using var client = CreateClient();
var consumer = CreateConsumer(client, await
_fixture.CreateTopic(_cts.Token));
- await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+ await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
@@ -277,14 +277,14 @@ public sealed class ConsumerTests : IDisposable
await using var producer = CreateProducer(client, topicName);
await using var consumer = CreateConsumer(client, topicName);
await ProduceMessages(producer, 1, "test-message", _cts.Token);
- await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+ await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
//Act
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected,
_cts.Token);
}
- await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+ await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
var exception = await
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
//Assert
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
index 42f0df9..dc4aecc 100644
--- a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -28,7 +28,7 @@ public class PingPongHandlerTest
var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
// Act
- var actual = await uut.OnStateChangeTo(PingPongHandlerState.TimedOut);
+ var actual = await
uut.State.OnStateChangeTo(PingPongHandlerState.TimedOut);
// Assert
actual.Should().Be(expected);
@@ -42,9 +42,9 @@ public class PingPongHandlerTest
var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
// Act
- _ = await uut.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
+ _ = await
uut.State.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
uut.Incoming(DotPulsar.Internal.PulsarApi.BaseCommand.Type.Ack);
- var actual = await uut.OnStateChangeTo(PingPongHandlerState.Active);
+ var actual = await
uut.State.OnStateChangeTo(PingPongHandlerState.Active);
// Assert
actual.Should().Be(expected);
@@ -58,7 +58,7 @@ public class PingPongHandlerTest
var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
// Act
- var actual = uut.OnStateChangeTo(PingPongHandlerState.Closed);
+ var actual = uut.State.OnStateChangeTo(PingPongHandlerState.Closed);
await uut.DisposeAsync();
// Assert
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 39a8c89..c483d6e 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -107,14 +107,14 @@ public sealed class ProducerTests : IDisposable
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer1 = CreateProducer(client, topicName,
accessModeForProducer1);
- await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer1.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
//Act
await using var producer2 = CreateProducer(client, topicName,
accessModeForProducer2);
if (accessModeForProducer2 == ProducerAccessMode.ExclusiveWithFencing)
// We need to send a message to trigger the state change
{
- await producer2.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
+ await producer2.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
try
{
@@ -126,8 +126,8 @@ public sealed class ProducerTests : IDisposable
}
}
- var actualStateForProducer1 = await
producer1.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
- var actualStateForProducer2 = await
producer2.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
+ var actualStateForProducer1 = await
producer1.State.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
+ var actualStateForProducer2 = await
producer2.State.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
//Assert
actualStateForProducer1.Should().Be(expectedStateForProducer1);
@@ -275,7 +275,7 @@ public sealed class ProducerTests : IDisposable
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var producer = CreateProducer(client, topicName);
- await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
ValueTask<MessageId> sendTask;
await using (await _fixture.DisableThePulsarConnection())
@@ -283,7 +283,7 @@ public sealed class ProducerTests : IDisposable
await producer.StateChangedTo(ProducerState.Disconnected,
_cts.Token);
sendTask = producer.Send("test", _cts.Token);
}
- await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
//Act
var exception = await Record.ExceptionAsync(sendTask.AsTask);
@@ -330,7 +330,7 @@ public sealed class ProducerTests : IDisposable
//Arrange
await using var client = CreateClient();
await using var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
- await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
@@ -347,14 +347,14 @@ public sealed class ProducerTests : IDisposable
//Arrange
await using var client = CreateClient();
await using var producer = CreateProducer(client, await
_fixture.CreateTopic(_cts.Token));
- await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
//Act
await using (await _fixture.DisableThePulsarConnection())
{
await producer.StateChangedTo(ProducerState.Disconnected,
_cts.Token);
}
- await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+ await producer.State.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
var exception = await Record.ExceptionAsync(producer.Send("test",
_cts.Token).AsTask);
//Assert
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 4b31294..7e1e3b6 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -203,7 +203,7 @@ public sealed class ReaderTests : IDisposable
await using var reader = CreateReader(client, MessageId.Earliest,
await _fixture.CreateTopic(_cts.Token));
- await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
+ await reader.State.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
//Act
var exception = await
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
@@ -277,7 +277,7 @@ public sealed class ReaderTests : IDisposable
//Arrange
await using var client = CreateClient();
var reader = CreateReader(client, MessageId.Earliest, await
_fixture.CreateTopic(_cts.Token));
- await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+ await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
//Act
await using var connectionDown = await
_fixture.DisableThePulsarConnection();
@@ -297,14 +297,14 @@ public sealed class ReaderTests : IDisposable
await using var producer = CreateProducer(client, topicName);
await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
await producer.Send("test-message", _cts.Token);
- await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+ await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
//Act
await using (await _fixture.DisableThePulsarConnection())
{
await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
}
- await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+ await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
var exception = await
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
//Assert
diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
index df252a1..a294066 100644
--- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -66,7 +66,7 @@ public class StateManagerTests
{
//Arrange
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
- var task = uut.StateChangedTo(newState, default);
+ var task = uut.OnStateChangeTo(newState, default);
//Act
_ = uut.SetState(newState);
@@ -84,7 +84,7 @@ public class StateManagerTests
{
//Arrange
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
- var task = uut.StateChangedFrom(initialState, default);
+ var task = uut.OnStateChangeFrom(initialState, default);
//Act
_ = uut.SetState(newState);
@@ -97,13 +97,13 @@ public class StateManagerTests
[InlineData(ProducerState.Connected)]
[InlineData(ProducerState.Disconnected)]
[InlineData(ProducerState.Closed)]
- public void
StateChangedTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState state)
+ public void
OnStateChangeTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState
state)
{
//Arrange
var uut = new StateManager<ProducerState>(state, ProducerState.Closed);
//Act
- var task = uut.StateChangedTo(state, default);
+ var task = uut.OnStateChangeTo(state, default);
//Assert
Assert.True(task.IsCompleted);
@@ -114,13 +114,13 @@ public class StateManagerTests
[InlineData(ProducerState.Connected, ProducerState.Closed)]
[InlineData(ProducerState.Disconnected, ProducerState.Connected)]
[InlineData(ProducerState.Disconnected, ProducerState.Closed)]
- public void
StateChangedTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState
initialState, ProducerState wantedState)
+ public void
OnStateChangeTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState
initialState, ProducerState wantedState)
{
//Arrange
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
//Act
- var task = uut.StateChangedTo(wantedState, default);
+ var task = uut.OnStateChangeTo(wantedState, default);
//Assert
task.IsCompleted.Should().BeFalse();
@@ -129,13 +129,13 @@ public class StateManagerTests
[Theory]
[InlineData(ProducerState.Connected)]
[InlineData(ProducerState.Disconnected)]
- public void
StateChangedTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+ public void
OnStateChangeTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
{
//Arrange
var uut = new StateManager<ProducerState>(ProducerState.Closed,
ProducerState.Closed);
//Act
- var task = uut.StateChangedTo(state, default);
+ var task = uut.OnStateChangeTo(state, default);
//Assert
task.IsCompleted.Should().BeTrue();
@@ -144,13 +144,13 @@ public class StateManagerTests
[Theory]
[InlineData(ProducerState.Connected)]
[InlineData(ProducerState.Disconnected)]
- public void
StateChangedFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState
state)
+ public void
OnStateChangeFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState
state)
{
//Arrange
var uut = new StateManager<ProducerState>(state, ProducerState.Closed);
//Act
- var task = uut.StateChangedFrom(state, default);
+ var task = uut.OnStateChangeFrom(state, default);
//Assert
task.IsCompleted.Should().BeFalse();
@@ -161,13 +161,13 @@ public class StateManagerTests
[InlineData(ProducerState.Connected, ProducerState.Closed)]
[InlineData(ProducerState.Disconnected, ProducerState.Connected)]
[InlineData(ProducerState.Disconnected, ProducerState.Closed)]
- public void
StateChangedFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState
initialState, ProducerState fromState)
+ public void
OnStateChangeFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState
initialState, ProducerState fromState)
{
//Arrange
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
//Act
- var task = uut.StateChangedFrom(fromState, default);
+ var task = uut.OnStateChangeFrom(fromState, default);
//Assert
task.IsCompleted.Should().BeTrue();
@@ -177,13 +177,13 @@ public class StateManagerTests
[InlineData(ProducerState.Connected)]
[InlineData(ProducerState.Disconnected)]
[InlineData(ProducerState.Closed)]
- public void
StateChangedFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+ public void
OnStateChangeFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
{
//Arrange
var uut = new StateManager<ProducerState>(ProducerState.Closed,
ProducerState.Closed);
//Act
- var task = uut.StateChangedFrom(state, default);
+ var task = uut.OnStateChangeFrom(state, default);
//Assert
task.IsCompleted.Should().BeTrue();
@@ -198,7 +198,7 @@ public class StateManagerTests
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
//Act
- var task = uut.StateChangedTo(wantedState, default);
+ var task = uut.OnStateChangeTo(wantedState, default);
_ = uut.SetState(ProducerState.Closed);
//Assert
@@ -214,7 +214,7 @@ public class StateManagerTests
var uut = new StateManager<ProducerState>(initialState,
ProducerState.Closed);
//Act
- var task = uut.StateChangedTo(wantedState, default);
+ var task = uut.OnStateChangeTo(wantedState, default);
_ = uut.SetState(newState);
//Assert
@@ -229,7 +229,7 @@ public class StateManagerTests
using var cts = new CancellationTokenSource();
//Act
- var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token);
+ var task = uut.OnStateChangeFrom(ProducerState.Connected, cts.Token);
cts.Cancel();
var exception = await Record.ExceptionAsync(() => task.AsTask()); //
xUnit can't record ValueTask yet
@@ -246,7 +246,7 @@ public class StateManagerTests
//Act
cts.Cancel();
- var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token);
+ var task = uut.OnStateChangeFrom(ProducerState.Connected, cts.Token);
var exception = await Record.ExceptionAsync(() => task.AsTask()); //
xUnit can't record ValueTask yet
//Assert
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index 77b924a..3e74ff7 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -42,7 +42,7 @@ public sealed class PulsarClientTests : IDisposable
// Act
var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
- var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
+ var state = await
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
@@ -68,7 +68,7 @@ public sealed class PulsarClientTests : IDisposable
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
throwException = true;
- var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
+ var state = await
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
state.Should().Be(ProducerState.Faulted);
@@ -88,7 +88,7 @@ public sealed class PulsarClientTests : IDisposable
// Act
var exception = await Record.ExceptionAsync(() =>
producer.Send("Test", _cts.Token).AsTask());
- var state = await producer.OnStateChangeTo(ProducerState.Faulted,
_cts.Token);
+ var state = await
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
// Assert
exception.Should().BeOfType<ProducerFaultedException>();
@@ -119,7 +119,7 @@ public sealed class PulsarClientTests : IDisposable
// Act
_ = await producer.Send("Test", _cts.Token); // Make sure we have a
working connection
await tcs.Task;
- var state = await producer.OnStateChangeTo(ProducerState.Connected,
_cts.Token);
+ var state = await
producer.State.OnStateChangeTo(ProducerState.Connected, _cts.Token);
// Assert
state.Should().Be(ProducerState.Connected);