This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa8d50  Created IStateHolder and have consumers, producers, and 
readers implement that instead of IState
7aa8d50 is described below

commit 7aa8d502af7114d21528d6aee40f60342b26baa4
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Dec 11 11:44:51 2024 +0100

    Created IStateHolder and have consumers, producers, and readers implement 
that instead of IState
---
 CHANGELOG.md                                       |   6 ++
 src/DotPulsar/Abstractions/IConsumer.cs            |   2 +-
 src/DotPulsar/Abstractions/IProducer.cs            |   2 +-
 src/DotPulsar/Abstractions/IReader.cs              |   2 +-
 .../Abstractions/{IProducer.cs => IStateHolder.cs} |  13 +--
 src/DotPulsar/Extensions/ConsumerExtensions.cs     |   8 +-
 src/DotPulsar/Extensions/ProducerExtensions.cs     |   8 +-
 src/DotPulsar/Extensions/ReaderExtensions.cs       |   8 +-
 src/DotPulsar/Extensions/StateExtensions.cs        | 108 ---------------------
 ...StateExtensions.cs => StateHolderExtensions.cs} |  88 ++++-------------
 src/DotPulsar/Internal/Abstractions/IConnection.cs |   2 +-
 .../Internal/Abstractions/IStateChanged.cs         |  59 -----------
 .../Internal/Abstractions/IStateManager.cs         |   6 +-
 src/DotPulsar/Internal/ChannelManager.cs           |  14 +--
 src/DotPulsar/Internal/Connection.cs               |  18 +---
 src/DotPulsar/Internal/ConnectionPool.cs           |   2 +-
 src/DotPulsar/Internal/Consumer.cs                 |  17 +---
 src/DotPulsar/Internal/MonitorState.cs             |   6 +-
 src/DotPulsar/Internal/PingPongHandler.cs          |  14 +--
 src/DotPulsar/Internal/Producer.cs                 |  20 +---
 src/DotPulsar/Internal/Reader.cs                   |  17 +---
 src/DotPulsar/Internal/StateManager.cs             |   4 +-
 src/DotPulsar/Internal/SubConsumer.cs              |  17 +---
 src/DotPulsar/Internal/SubProducer.cs              |  19 +---
 src/DotPulsar/Internal/SubReader.cs                |  17 +---
 .../Extensions/StateExtensionsTests.cs             |  44 ---------
 .../Extensions/StateHolderExtensionsTests.cs       |  68 +++++++++++++
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    |   8 +-
 .../Internal/PingPongHandlerTest.cs                |   8 +-
 tests/DotPulsar.Tests/Internal/ProducerTests.cs    |  18 ++--
 tests/DotPulsar.Tests/Internal/ReaderTests.cs      |   8 +-
 .../DotPulsar.Tests/Internal/StateManagerTests.cs  |  36 +++----
 tests/DotPulsar.Tests/PulsarClientTests.cs         |   8 +-
 33 files changed, 198 insertions(+), 477 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80d6122..787d041 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [Unreleased]
+
+### Changed
+
+- **Breaking**: The consumer, reader, and producer now implements IStateHolder 
instead of IState
+
 ## [3.6.0] - 2024-12-09
 
 ### Added
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs 
b/src/DotPulsar/Abstractions/IConsumer.cs
index 5f90a38..84ea708 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
 /// <summary>
 /// A consumer abstraction.
 /// </summary>
-public interface IConsumer : IGetLastMessageIds, ISeek, IState<ConsumerState>, 
IAsyncDisposable
+public interface IConsumer : IGetLastMessageIds, ISeek, 
IStateHolder<ConsumerState>, IAsyncDisposable
 {
     /// <summary>
     /// Acknowledge the consumption of a single message using the MessageId.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs 
b/src/DotPulsar/Abstractions/IProducer.cs
index 7f8b658..0f5d10f 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
 /// <summary>
 /// A producer abstraction.
 /// </summary>
-public interface IProducer : IState<ProducerState>, IAsyncDisposable
+public interface IProducer : IStateHolder<ProducerState>, IAsyncDisposable
 {
     /// <summary>
     /// The producer's service url.
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/Abstractions/IReader.cs
index 812a4f0..931b014 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Abstractions;
 /// <summary>
 /// A reader abstraction.
 /// </summary>
-public interface IReader : IGetLastMessageIds, ISeek, IState<ReaderState>, 
IAsyncDisposable
+public interface IReader : IGetLastMessageIds, ISeek, 
IStateHolder<ReaderState>, IAsyncDisposable
 {
     /// <summary>
     /// The reader's service url.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs 
b/src/DotPulsar/Abstractions/IStateHolder.cs
similarity index 71%
copy from src/DotPulsar/Abstractions/IProducer.cs
copy to src/DotPulsar/Abstractions/IStateHolder.cs
index 7f8b658..34b5e37 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IStateHolder.cs
@@ -15,17 +15,12 @@
 namespace DotPulsar.Abstractions;
 
 /// <summary>
-/// A producer abstraction.
+/// A state holder abstraction.
 /// </summary>
-public interface IProducer : IState<ProducerState>, IAsyncDisposable
+public interface IStateHolder<TState> where TState : notnull
 {
     /// <summary>
-    /// The producer's service url.
+    /// The state abstraction of the holder.
     /// </summary>
-    public Uri ServiceUrl { get; }
-
-    /// <summary>
-    /// The producer's topic.
-    /// </summary>
-    string Topic { get; }
+    IState<TState> State { get; }
 }
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs 
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 1d4e200..ee00db2 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -67,7 +67,7 @@ public static class ConsumerExtensions
     /// </remarks>
     public static async ValueTask<ConsumerStateChanged> StateChangedTo(this 
IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await consumer.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await consumer.State.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
         return new ConsumerStateChanged(consumer, currentState);
     }
 
@@ -82,7 +82,7 @@ public static class ConsumerExtensions
     /// </remarks>
     public static async ValueTask<ConsumerStateChanged> StateChangedTo(this 
IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await consumer.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await consumer.State.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
         return new ConsumerStateChanged(consumer, currentState);
     }
 
@@ -97,7 +97,7 @@ public static class ConsumerExtensions
     /// </remarks>
     public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this 
IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await consumer.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await consumer.State.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
         return new ConsumerStateChanged(consumer, currentState);
     }
 
@@ -112,7 +112,7 @@ public static class ConsumerExtensions
     /// </remarks>
     public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this 
IConsumer consumer, ConsumerState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await consumer.OnStateChangeFrom(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await consumer.State.OnStateChangeFrom(state, 
delay, cancellationToken).ConfigureAwait(false);
         return new ConsumerStateChanged(consumer, currentState);
     }
 }
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs 
b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 885f948..f31e36e 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -39,7 +39,7 @@ public static class ProducerExtensions
     /// </remarks>
     public static async ValueTask<ProducerStateChanged> StateChangedTo(this 
IProducer producer, ProducerState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await producer.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await producer.State.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
         return new ProducerStateChanged(producer, currentState);
     }
 
@@ -54,7 +54,7 @@ public static class ProducerExtensions
     /// </remarks>
     public static async ValueTask<ProducerStateChanged> StateChangedTo(this 
IProducer producer, ProducerState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await producer.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await producer.State.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
         return new ProducerStateChanged(producer, currentState);
     }
 
@@ -69,7 +69,7 @@ public static class ProducerExtensions
     /// </remarks>
     public static async ValueTask<ProducerStateChanged> StateChangedFrom(this 
IProducer producer, ProducerState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await producer.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await producer.State.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
         return new ProducerStateChanged(producer, currentState);
     }
 
@@ -84,7 +84,7 @@ public static class ProducerExtensions
     /// </remarks>
     public static async ValueTask<ProducerStateChanged> StateChangedFrom(this 
IProducer producer, ProducerState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await producer.OnStateChangeFrom(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await producer.State.OnStateChangeFrom(state, 
delay, cancellationToken).ConfigureAwait(false);
         return new ProducerStateChanged(producer, currentState);
     }
 }
diff --git a/src/DotPulsar/Extensions/ReaderExtensions.cs 
b/src/DotPulsar/Extensions/ReaderExtensions.cs
index e8c109a..ee8041a 100644
--- a/src/DotPulsar/Extensions/ReaderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderExtensions.cs
@@ -32,7 +32,7 @@ public static class ReaderExtensions
     /// </remarks>
     public static async ValueTask<ReaderStateChanged> StateChangedTo(this 
IReader reader, ReaderState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await reader.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await reader.State.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
         return new ReaderStateChanged(reader, currentState);
     }
 
@@ -47,7 +47,7 @@ public static class ReaderExtensions
     /// </remarks>
     public static async ValueTask<ReaderStateChanged> StateChangedTo(this 
IReader reader, ReaderState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await reader.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await reader.State.OnStateChangeTo(state, delay, 
cancellationToken).ConfigureAwait(false);
         return new ReaderStateChanged(reader, currentState);
     }
 
@@ -62,7 +62,7 @@ public static class ReaderExtensions
     /// </remarks>
     public static async ValueTask<ReaderStateChanged> StateChangedFrom(this 
IReader reader, ReaderState state, CancellationToken cancellationToken = 
default)
     {
-        var currentState = await reader.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await reader.State.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
         return new ReaderStateChanged(reader, currentState);
     }
 
@@ -77,7 +77,7 @@ public static class ReaderExtensions
     /// </remarks>
     public static async ValueTask<ReaderStateChanged> StateChangedFrom(this 
IReader reader, ReaderState state, TimeSpan delay, CancellationToken 
cancellationToken = default)
     {
-        var currentState = await reader.OnStateChangeFrom(state, delay, 
cancellationToken).ConfigureAwait(false);
+        var currentState = await reader.State.OnStateChangeFrom(state, delay, 
cancellationToken).ConfigureAwait(false);
         return new ReaderStateChanged(reader, currentState);
     }
 }
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs 
b/src/DotPulsar/Extensions/StateExtensions.cs
index 0e07bb2..a04e6b3 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateExtensions.cs
@@ -100,112 +100,4 @@ public static class StateExtensions
             }
         }
     }
-
-    /// <summary>
-    /// Will invoke the onStateLeft callback when the state if left (with 
delay) and onStateReached when it's reached again.
-    /// </summary>
-    /// <returns>
-    /// ValueTask that will run as long as a final state is not entered.
-    /// </returns>
-    public static async ValueTask DelayedStateMonitor<TEntity, TState, 
TFaultContext>(
-        this TEntity stateImplementer,
-        TState state,
-        TimeSpan delay,
-        Func<TEntity, TState, CancellationToken, ValueTask<TFaultContext>> 
onStateLeft,
-        Func<TEntity, TState, TFaultContext, CancellationToken, ValueTask> 
onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull where TFaultContext : class
-    {
-        while (true)
-        {
-            cancellationToken.ThrowIfCancellationRequested();
-
-            var currentState = await stateImplementer.OnStateChangeFrom(state, 
delay, cancellationToken).ConfigureAwait(false);
-
-            TFaultContext? faultContext = null;
-
-            try
-            {
-                faultContext = await onStateLeft(stateImplementer, 
currentState, cancellationToken).ConfigureAwait(false);
-            }
-            catch
-            {
-                // Ignore
-            }
-
-            if (stateImplementer.IsFinalState(currentState))
-                return;
-
-            currentState = await stateImplementer.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
-
-            if (stateImplementer.IsFinalState(currentState))
-                return;
-
-            try
-            {
-                if (faultContext is not null)
-                    await onStateReached(stateImplementer, currentState, 
faultContext, cancellationToken).ConfigureAwait(false);
-            }
-            catch
-            {
-                // Ignore
-            }
-        }
-    }
-
-    /// <summary>
-    /// Will invoke the onStateLeft callback when the state if left (with 
delay) and onStateReached when it's reached again.
-    /// </summary>
-    /// <returns>
-    /// ValueTask that will run as long as a final state is not entered.
-    /// </returns>
-    public static async ValueTask DelayedStateMonitor<TEntity, TState>(
-        this TEntity stateImplementer,
-        TState state,
-        TimeSpan delay,
-        Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
-        Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull
-    {
-        async ValueTask<string> onStateLeftFunction(TEntity entity, TState 
state, CancellationToken cancellationToken)
-        {
-            await onStateLeft(entity, state, 
cancellationToken).ConfigureAwait(false);
-            return string.Empty;
-        }
-
-        async ValueTask onStateReachedFunction(TEntity entity, TState state, 
string faultContext, CancellationToken cancellationToken)
-        {
-            await onStateReached(entity, state, 
cancellationToken).ConfigureAwait(false);
-        }
-
-        await stateImplementer.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
-    }
-
-    /// <summary>
-    /// Will invoke the onStateLeft callback when the state if left (with 
delay) and onStateReached when it's reached again.
-    /// </summary>
-    /// <returns>
-    /// ValueTask that will run as long as a final state is not entered.
-    /// </returns>
-    public static async ValueTask DelayedStateMonitor<TEntity, TState>(
-        this TEntity stateImplementer,
-        TState state,
-        TimeSpan delay,
-        Action<TEntity, TState> onStateLeft,
-        Action<TEntity, TState> onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull
-    {
-        ValueTask<string> onStateLeftFunction(TEntity entity, TState state, 
CancellationToken cancellationToken)
-        {
-            onStateLeft(entity, state);
-            return new ValueTask<string>(string.Empty);
-        }
-
-        ValueTask onStateReachedFunction(TEntity entity, TState state, string 
faultContext, CancellationToken cancellationToken)
-        {
-            onStateReached(entity, state);
-            return new ValueTask();
-        }
-
-        await stateImplementer.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
-    }
 }
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs 
b/src/DotPulsar/Extensions/StateHolderExtensions.cs
similarity index 61%
copy from src/DotPulsar/Extensions/StateExtensions.cs
copy to src/DotPulsar/Extensions/StateHolderExtensions.cs
index 0e07bb2..0ca2631 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateHolderExtensions.cs
@@ -17,9 +17,9 @@ namespace DotPulsar.Extensions;
 using DotPulsar.Abstractions;
 
 /// <summary>
-/// Extensions for IState.
+/// Extensions for IStateHolder.
 /// </summary>
-public static class StateExtensions
+public static class StateHolderExtensions
 {
     /// <summary>
     /// Wait for the state to change to a specific state with a delay.
@@ -31,35 +31,11 @@ public static class StateExtensions
     /// If the state change to a final state, then all awaiting tasks will 
complete.
     /// </remarks>
     public static async ValueTask<TState> OnStateChangeTo<TState>(
-        this IState<TState> stateChanged,
+        this IStateHolder<TState> stateHolder,
         TState state,
         TimeSpan delay,
         CancellationToken cancellationToken = default) where TState : notnull
-    {
-        while (true)
-        {
-            var currentState = await stateChanged.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
-            if (stateChanged.IsFinalState(currentState))
-                return currentState;
-
-            using var cts = 
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
-            cts.CancelAfter(delay);
-
-            try
-            {
-                currentState = await stateChanged.OnStateChangeFrom(state, 
cts.Token).ConfigureAwait(false);
-                if (stateChanged.IsFinalState(currentState))
-                    return currentState;
-            }
-            catch (OperationCanceledException)
-            {
-                if (cancellationToken.IsCancellationRequested)
-                    throw;
-
-                return state;
-            }
-        }
-    }
+        => await stateHolder.State.OnStateChangeTo(state, delay, 
cancellationToken);
 
     /// <summary>
     /// Wait for the state to change from a specific state with a delay.
@@ -71,35 +47,11 @@ public static class StateExtensions
     /// If the state change to a final state, then all awaiting tasks will 
complete.
     /// </remarks>
     public static async ValueTask<TState> OnStateChangeFrom<TState>(
-        this IState<TState> stateChanged,
+        this IStateHolder<TState> stateHolder,
         TState state,
         TimeSpan delay,
         CancellationToken cancellationToken = default) where TState : notnull
-    {
-        while (true)
-        {
-            var currentState = await stateChanged.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
-            if (stateChanged.IsFinalState(currentState))
-                return currentState;
-
-            using var cts = 
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
-            cts.CancelAfter(delay);
-
-            try
-            {
-                currentState = await stateChanged.OnStateChangeTo(state, 
cts.Token).ConfigureAwait(false);
-                if (stateChanged.IsFinalState(currentState))
-                    return currentState;
-            }
-            catch (OperationCanceledException)
-            {
-                if (cancellationToken.IsCancellationRequested)
-                    throw;
-
-                return currentState;
-            }
-        }
-    }
+    => await stateHolder.State.OnStateChangeFrom(state, delay, 
cancellationToken);
 
     /// <summary>
     /// Will invoke the onStateLeft callback when the state if left (with 
delay) and onStateReached when it's reached again.
@@ -108,42 +60,42 @@ public static class StateExtensions
     /// ValueTask that will run as long as a final state is not entered.
     /// </returns>
     public static async ValueTask DelayedStateMonitor<TEntity, TState, 
TFaultContext>(
-        this TEntity stateImplementer,
+        this TEntity stateHolder,
         TState state,
         TimeSpan delay,
         Func<TEntity, TState, CancellationToken, ValueTask<TFaultContext>> 
onStateLeft,
         Func<TEntity, TState, TFaultContext, CancellationToken, ValueTask> 
onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull where TFaultContext : class
+        CancellationToken cancellationToken = default) where TEntity : 
IStateHolder<TState> where TState : notnull where TFaultContext : class
     {
         while (true)
         {
             cancellationToken.ThrowIfCancellationRequested();
 
-            var currentState = await stateImplementer.OnStateChangeFrom(state, 
delay, cancellationToken).ConfigureAwait(false);
+            var currentState = await stateHolder.OnStateChangeFrom(state, 
delay, cancellationToken).ConfigureAwait(false);
 
             TFaultContext? faultContext = null;
 
             try
             {
-                faultContext = await onStateLeft(stateImplementer, 
currentState, cancellationToken).ConfigureAwait(false);
+                faultContext = await onStateLeft(stateHolder, currentState, 
cancellationToken).ConfigureAwait(false);
             }
             catch
             {
                 // Ignore
             }
 
-            if (stateImplementer.IsFinalState(currentState))
+            if (stateHolder.State.IsFinalState(currentState))
                 return;
 
-            currentState = await stateImplementer.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
+            currentState = await stateHolder.State.OnStateChangeTo(state, 
cancellationToken).ConfigureAwait(false);
 
-            if (stateImplementer.IsFinalState(currentState))
+            if (stateHolder.State.IsFinalState(currentState))
                 return;
 
             try
             {
                 if (faultContext is not null)
-                    await onStateReached(stateImplementer, currentState, 
faultContext, cancellationToken).ConfigureAwait(false);
+                    await onStateReached(stateHolder, currentState, 
faultContext, cancellationToken).ConfigureAwait(false);
             }
             catch
             {
@@ -159,12 +111,12 @@ public static class StateExtensions
     /// ValueTask that will run as long as a final state is not entered.
     /// </returns>
     public static async ValueTask DelayedStateMonitor<TEntity, TState>(
-        this TEntity stateImplementer,
+        this TEntity stateHolder,
         TState state,
         TimeSpan delay,
         Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
         Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull
+        CancellationToken cancellationToken = default) where TEntity : 
IStateHolder<TState> where TState : notnull
     {
         async ValueTask<string> onStateLeftFunction(TEntity entity, TState 
state, CancellationToken cancellationToken)
         {
@@ -177,7 +129,7 @@ public static class StateExtensions
             await onStateReached(entity, state, 
cancellationToken).ConfigureAwait(false);
         }
 
-        await stateImplementer.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
+        await stateHolder.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
     }
 
     /// <summary>
@@ -187,12 +139,12 @@ public static class StateExtensions
     /// ValueTask that will run as long as a final state is not entered.
     /// </returns>
     public static async ValueTask DelayedStateMonitor<TEntity, TState>(
-        this TEntity stateImplementer,
+        this TEntity stateHolder,
         TState state,
         TimeSpan delay,
         Action<TEntity, TState> onStateLeft,
         Action<TEntity, TState> onStateReached,
-        CancellationToken cancellationToken = default) where TEntity : 
IState<TState> where TState : notnull
+        CancellationToken cancellationToken = default) where TEntity : 
IStateHolder<TState> where TState : notnull
     {
         ValueTask<string> onStateLeftFunction(TEntity entity, TState state, 
CancellationToken cancellationToken)
         {
@@ -206,6 +158,6 @@ public static class StateExtensions
             return new ValueTask();
         }
 
-        await stateImplementer.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
+        await stateHolder.DelayedStateMonitor(state, delay, 
onStateLeftFunction, onStateReachedFunction, 
cancellationToken).ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs 
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 48cad09..2f2a5b8 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -17,7 +17,7 @@ namespace DotPulsar.Internal.Abstractions;
 using DotPulsar.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 
-public interface IConnection : IState<ConnectionState>, IAsyncDisposable
+public interface IConnection : IStateHolder<ConnectionState>, IAsyncDisposable
 {
     public int MaxMessageSize { get; }
 
diff --git a/src/DotPulsar/Internal/Abstractions/IStateChanged.cs 
b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
deleted file mode 100644
index 31f579f..0000000
--- a/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions;
-
-/// <summary>
-/// A state change monitoring abstraction.
-/// </summary>
-public interface IStateChanged<TState> where TState : notnull
-{
-    /// <summary>
-    /// Wait for the state to change to a specific state.
-    /// </summary>
-    /// <returns>
-    /// The current state.
-    /// </returns>
-    /// <remarks>
-    /// If the state change to a final state, then all awaiting tasks will 
complete.
-    /// </remarks>
-    ValueTask<TState> StateChangedTo(TState state, CancellationToken 
cancellationToken = default);
-
-    /// <summary>
-    /// Wait for the state to change from a specific state.
-    /// </summary>
-    /// <returns>
-    /// The current state.
-    /// </returns>
-    /// <remarks>
-    /// If the state change to a final state, then all awaiting tasks will 
complete.
-    /// </remarks>
-    ValueTask<TState> StateChangedFrom(TState state, CancellationToken 
cancellationToken = default);
-
-    /// <summary>
-    /// Ask whether the current state is final, meaning that it will never 
change.
-    /// </summary>
-    /// <returns>
-    /// True if it's final and False if it's not.
-    /// </returns>
-    bool IsFinalState();
-
-    /// <summary>
-    /// Ask whether the provided state is final, meaning that it will never 
change.
-    /// </summary>
-    /// <returns>
-    /// True if it's final and False if it's not.
-    /// </returns>
-    bool IsFinalState(TState state);
-}
diff --git a/src/DotPulsar/Internal/Abstractions/IStateManager.cs 
b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
index 135a2ac..9d5fe25 100644
--- a/src/DotPulsar/Internal/Abstractions/IStateManager.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateManager.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -14,7 +14,9 @@
 
 namespace DotPulsar.Internal.Abstractions;
 
-public interface IStateManager<TState> : IStateChanged<TState> where TState : 
notnull
+using DotPulsar.Abstractions;
+
+public interface IStateManager<TState> : IState<TState> where TState : notnull
 {
     TState CurrentState { get; }
     TState SetState(TState state);
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index 7d4a1e8..dc7e14a 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -20,7 +20,7 @@ using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System.Buffers;
 
-public sealed class ChannelManager : IState<ChannelManagerState>, IDisposable
+public sealed class ChannelManager : IStateHolder<ChannelManagerState>, 
IDisposable
 {
     private readonly StateManager<ChannelManagerState> _stateManager;
     private readonly RequestResponseHandler _requestResponseHandler;
@@ -28,6 +28,8 @@ public sealed class ChannelManager : 
IState<ChannelManagerState>, IDisposable
     private readonly IdLookup<IChannel> _producerChannels;
     private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> 
_incoming;
 
+    public IState<ChannelManagerState> State => _stateManager;
+
     public ChannelManager()
     {
         _stateManager = new 
StateManager<ChannelManagerState>(ChannelManagerState.Inactive, 
ChannelManagerState.Closed);
@@ -302,14 +304,4 @@ public sealed class ChannelManager : 
IState<ChannelManagerState>, IDisposable
         if (_consumerChannels.IsEmpty() && _producerChannels.IsEmpty())
             _stateManager.SetState(ChannelManagerState.Inactive);
     }
-
-    public bool IsFinalState() => _stateManager.IsFinalState();
-
-    public bool IsFinalState(ChannelManagerState state) => 
_stateManager.IsFinalState(state);
-
-    public async ValueTask<ChannelManagerState> 
OnStateChangeTo(ChannelManagerState state, CancellationToken cancellationToken 
= default)
-        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ChannelManagerState> 
OnStateChangeFrom(ChannelManagerState state, CancellationToken 
cancellationToken = default)
-        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 1c7771a..03f618e 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -61,6 +61,8 @@ public sealed class Connection : IConnection
 
     public int MaxMessageSize { get; set; }
 
+    public IState<ConnectionState> State => _stateManager;
+
     public async Task<ProducerResponse> Send(CommandProducer command, IChannel 
channel, CancellationToken cancellationToken)
     {
         ThrowIfDisposed();
@@ -309,8 +311,8 @@ public sealed class Connection : IConnection
     private async Task Setup(CancellationToken cancellationToken)
     {
         var incoming = ProcessIncomingFrames(cancellationToken);
-        var channelManager = 
_channelManager.OnStateChangeTo(ChannelManagerState.Inactive, 
_closeOnInactiveInterval, cancellationToken).AsTask();
-        var pingPongTimeOut = 
_pingPongHandler.OnStateChangeTo(PingPongHandlerState.TimedOut, 
cancellationToken).AsTask();
+        var channelManager = 
_channelManager.State.OnStateChangeTo(ChannelManagerState.Inactive, 
_closeOnInactiveInterval, cancellationToken).AsTask();
+        var pingPongTimeOut = 
_pingPongHandler.State.OnStateChangeTo(PingPongHandlerState.TimedOut, 
cancellationToken).AsTask();
         _ = Task.Factory.StartNew(async () => await 
KeepAlive(PingPongHandlerState.Active, 
cancellationToken).ConfigureAwait(false));
         await Task.WhenAny(incoming, channelManager, 
pingPongTimeOut).ConfigureAwait(false);
         _stateManager.SetState(ConnectionState.Disconnected);
@@ -320,7 +322,7 @@ public sealed class Connection : IConnection
     {
         while (!cancellationToken.IsCancellationRequested)
         {
-            state = await _pingPongHandler.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
+            state = await _pingPongHandler.State.OnStateChangeFrom(state, 
cancellationToken).ConfigureAwait(false);
             if (state == PingPongHandlerState.TimedOut)
                 return;
             if (state == PingPongHandlerState.Active)
@@ -381,14 +383,4 @@ public sealed class Connection : IConnection
         if (_isDisposed != 0)
             throw new ConnectionDisposedException();
     }
-
-    public bool IsFinalState() => _stateManager.IsFinalState();
-
-    public bool IsFinalState(ConnectionState state) => 
_stateManager.IsFinalState(state);
-
-    public async ValueTask<ConnectionState> OnStateChangeTo(ConnectionState 
state, CancellationToken cancellationToken = default)
-        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ConnectionState> OnStateChangeFrom(ConnectionState 
state, CancellationToken cancellationToken = default)
-        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index 8d794c4..0e0e540 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -152,7 +152,7 @@ public sealed class ConnectionPool : IConnectionPool
             commandConnect = WithProxyToBroker(commandConnect, url.Logical);
 
         var connection = Connection.Connect(new PulsarStream(stream), 
_authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
-        _ = connection.OnStateChangeFrom(ConnectionState.Connected, 
CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, 
connection), CancellationToken.None);
+        _ = connection.State.OnStateChangeFrom(ConnectionState.Connected, 
CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, 
connection), CancellationToken.None);
         var response = await connection.Send(commandConnect, 
cancellationToken).ConfigureAwait(false);
         response.Expect(BaseCommand.Type.Connected);
         _connections[url] = connection;
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 0e85429..dc45883 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -45,6 +45,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     public string SubscriptionName { get; }
     public SubscriptionType SubscriptionType { get; }
     public string Topic { get; }
+    public IState<ConsumerState> State => _state;
 
     public Consumer(
         Uri serviceUrl,
@@ -111,7 +112,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             _receiveTasks[i] = _emptyTaskCompletionSource.Task;
             var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
             _subConsumers[i] = CreateSubConsumer(topicName);
-            monitoringTasks[i] = 
_subConsumers[i].OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
+            monitoringTasks[i] = 
_subConsumers[i].State.OnStateChangeFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask();
         }
 
         _allSubConsumersAreReady = true;
@@ -129,7 +130,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
                 var state = task.Result;
                 states[i] = state;
-                monitoringTasks[i] = _subConsumers[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
+                monitoringTasks[i] = 
_subConsumers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
             }
 
             if (!_isPartitionedTopic)
@@ -151,18 +152,6 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
         }
     }
 
-    public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState 
state, CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ConsumerState state)
-        => _state.IsFinalState(state);
-
     public async ValueTask DisposeAsync()
     {
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/MonitorState.cs 
b/src/DotPulsar/Internal/MonitorState.cs
index 6cb264b..e20ab3c 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -25,7 +25,7 @@ public static class StateMonitor
 
         var state = ProducerState.Disconnected;
 
-        while (!producer.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
+        while (!producer.State.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await producer.StateChangedFrom(state, 
handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ProducerState;
@@ -47,7 +47,7 @@ public static class StateMonitor
 
         var state = ConsumerState.Disconnected;
 
-        while (!consumer.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
+        while (!consumer.State.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await consumer.StateChangedFrom(state, 
handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ConsumerState;
@@ -69,7 +69,7 @@ public static class StateMonitor
 
         var state = ReaderState.Disconnected;
 
-        while (!reader.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
+        while (!reader.State.IsFinalState(state) && 
!handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await reader.StateChangedFrom(state, 
handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ReaderState;
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index 4533552..1a036e6 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -18,7 +18,7 @@ using DotPulsar.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 using System.Diagnostics;
 
-public sealed class PingPongHandler : IState<PingPongHandlerState>, 
IAsyncDisposable
+public sealed class PingPongHandler : IStateHolder<PingPongHandlerState>, 
IAsyncDisposable
 {
     private readonly CancellationTokenSource _cts;
     private readonly StateManager<PingPongHandlerState> _stateManager;
@@ -26,6 +26,8 @@ public sealed class PingPongHandler : 
IState<PingPongHandlerState>, IAsyncDispos
     private long _lastCommand;
     private bool _waitForPong;
 
+    public IState<PingPongHandlerState> State => _stateManager;
+
     public PingPongHandler(TimeSpan keepAliveInterval)
     {
         _cts = new CancellationTokenSource();
@@ -85,14 +87,4 @@ public sealed class PingPongHandler : 
IState<PingPongHandlerState>, IAsyncDispos
         _stateManager.SetState(PingPongHandlerState.Closed);
         return new ValueTask();
     }
-
-    public bool IsFinalState() => _stateManager.IsFinalState();
-
-    public bool IsFinalState(PingPongHandlerState state) => 
_stateManager.IsFinalState(state);
-
-    public async ValueTask<PingPongHandlerState> 
OnStateChangeTo(PingPongHandlerState state, CancellationToken cancellationToken 
= default)
-        => await _stateManager.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<PingPongHandlerState> 
OnStateChangeFrom(PingPongHandlerState state, CancellationToken 
cancellationToken = default)
-        => await _stateManager.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 }
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index e9e2ad9..0f7534d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -44,8 +44,8 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
-
     public ISendChannel<TMessage> SendChannel { get; }
+    public IState<ProducerState> State => _state;
 
     public Producer(
         Uri serviceUrl,
@@ -116,7 +116,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             var topicName = isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
             var producer = CreateSubProducer(topicName, isPartitionedTopic ? i 
: -1);
             _ = _producers.TryAdd(i, producer);
-            monitoringTasks[i] = 
producer.OnStateChangeFrom(ProducerState.Disconnected, _cts.Token).AsTask();
+            monitoringTasks[i] = 
producer.State.OnStateChangeFrom(ProducerState.Disconnected, 
_cts.Token).AsTask();
         }
 
         Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
@@ -133,7 +133,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
 
                 var state = task.Result;
                 states[i] = state;
-                monitoringTasks[i] = _producers[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
+                monitoringTasks[i] = 
_producers[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
             }
 
             if (!isPartitionedTopic)
@@ -171,18 +171,6 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         return producer;
     }
 
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ProducerState state)
-        => _state.IsFinalState(state);
-
-    public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState 
state, CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
     public async ValueTask DisposeAsync()
     {
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
@@ -203,7 +191,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     {
         if (_producerCount == 0)
         {
-            var newState = await 
_state.StateChangedFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
+            var newState = await 
_state.OnStateChangeFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
             if (_faultException is not null)
                 throw new ProducerFaultedException(_faultException);
             if (newState == ProducerState.Closed)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 910e0c0..93f07f6 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -43,6 +43,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
+    public IState<ReaderState> State => _state;
 
     public Reader(
         Uri serviceUrl,
@@ -104,7 +105,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
             _receiveTasks[i] = _emptyTaskCompletionSource.Task;
             var topicName = _isPartitionedTopic ? GetPartitionedTopicName(i) : 
Topic;
             _subReaders[i] = CreateSubReader(topicName);
-            monitoringTasks[i] = 
_subReaders[i].OnStateChangeFrom(ReaderState.Disconnected, _cts.Token).AsTask();
+            monitoringTasks[i] = 
_subReaders[i].State.OnStateChangeFrom(ReaderState.Disconnected, 
_cts.Token).AsTask();
         }
 
         _allSubReadersAreReady = true;
@@ -122,7 +123,7 @@ public sealed class Reader<TMessage> : IReader<TMessage>
 
                 var state = task.Result;
                 states[i] = state;
-                monitoringTasks[i] = _subReaders[i].OnStateChangeFrom(state, 
_cts.Token).AsTask();
+                monitoringTasks[i] = 
_subReaders[i].State.OnStateChangeFrom(state, _cts.Token).AsTask();
             }
 
             if (!_isPartitionedTopic)
@@ -140,18 +141,6 @@ public sealed class Reader<TMessage> : IReader<TMessage>
         }
     }
 
-    public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ReaderState state)
-        => _state.IsFinalState(state);
-
     public async ValueTask<IEnumerable<MessageId>> 
GetLastMessageIds(CancellationToken cancellationToken)
     {
         await Guard(cancellationToken).ConfigureAwait(false);
diff --git a/src/DotPulsar/Internal/StateManager.cs 
b/src/DotPulsar/Internal/StateManager.cs
index 7e980f4..7a42e65 100644
--- a/src/DotPulsar/Internal/StateManager.cs
+++ b/src/DotPulsar/Internal/StateManager.cs
@@ -51,7 +51,7 @@ public sealed class StateManager<TState> : 
IStateManager<TState> where TState :
         }
     }
 
-    public ValueTask<TState> StateChangedTo(TState state, CancellationToken 
cancellationToken)
+    public ValueTask<TState> OnStateChangeTo(TState state, CancellationToken 
cancellationToken)
     {
         lock (_lock)
         {
@@ -61,7 +61,7 @@ public sealed class StateManager<TState> : 
IStateManager<TState> where TState :
         }
     }
 
-    public ValueTask<TState> StateChangedFrom(TState state, CancellationToken 
cancellationToken)
+    public ValueTask<TState> OnStateChangeFrom(TState state, CancellationToken 
cancellationToken)
     {
         lock (_lock)
         {
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
index a6001fe..6be040d 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -29,7 +29,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
     private IConsumerChannel<TMessage> _channel;
     private readonly ObjectPool<CommandAck> _commandAckPool;
     private readonly IExecute _executor;
-    private readonly IStateChanged<ConsumerState> _state;
+    private readonly IState<ConsumerState> _state;
     private readonly IConsumerChannelFactory<TMessage> _factory;
     private int _isDisposed;
     private Exception? _faultException;
@@ -38,6 +38,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
     public string SubscriptionName { get; }
     public SubscriptionType SubscriptionType { get; }
     public string Topic { get; }
+    public IState<ConsumerState> State => _state;
 
     public SubConsumer(
         Guid correlationId,
@@ -48,7 +49,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
         IRegisterEvent eventRegister,
         IConsumerChannel<TMessage> initialChannel,
         IExecute executor,
-        IStateChanged<ConsumerState> state,
+        IState<ConsumerState> state,
         IConsumerChannelFactory<TMessage> factory)
     {
         _correlationId = correlationId;
@@ -67,18 +68,6 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
         _eventRegister.Register(new ConsumerCreated(_correlationId));
     }
 
-    public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState 
state, CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ConsumerState state)
-        => _state.IsFinalState(state);
-
     public async ValueTask DisposeAsync()
     {
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index c778a97..0554988 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -22,7 +22,7 @@ using DotPulsar.Internal.Exceptions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 
-public sealed class SubProducer : IContainsChannel, IState<ProducerState>
+public sealed class SubProducer : IContainsChannel, IStateHolder<ProducerState>
 {
     private readonly AsyncQueueWithCursor<SendOp> _sendQueue;
     private CancellationTokenSource? _dispatcherCts;
@@ -31,20 +31,21 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
     private readonly IRegisterEvent _eventRegister;
     private IProducerChannel _channel;
     private readonly IExecute _executor;
-    private readonly IStateChanged<ProducerState> _state;
+    private readonly IState<ProducerState> _state;
     private readonly IProducerChannelFactory _factory;
     private readonly int _partition;
     private int _isDisposed;
     private Exception? _faultException;
 
     public string Topic { get; }
+    public IState<ProducerState> State => _state;
 
     public SubProducer(
         Guid correlationId,
         IRegisterEvent registerEvent,
         IProducerChannel initialChannel,
         IExecute executor,
-        IStateChanged<ProducerState> state,
+        IState<ProducerState> state,
         IProducerChannelFactory factory,
         int partition,
         uint maxPendingMessages,
@@ -64,18 +65,6 @@ public sealed class SubProducer : IContainsChannel, 
IState<ProducerState>
         _eventRegister.Register(new ProducerCreated(_correlationId));
     }
 
-    public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState 
state, CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ProducerState state)
-        => _state.IsFinalState(state);
-
     public async ValueTask DisposeAsync()
     {
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
diff --git a/src/DotPulsar/Internal/SubReader.cs 
b/src/DotPulsar/Internal/SubReader.cs
index e456a13..d0d6c72 100644
--- a/src/DotPulsar/Internal/SubReader.cs
+++ b/src/DotPulsar/Internal/SubReader.cs
@@ -26,13 +26,14 @@ public sealed class SubReader<TMessage> : IContainsChannel, 
IReader<TMessage>
     private readonly IRegisterEvent _eventRegister;
     private IConsumerChannel<TMessage> _channel;
     private readonly IExecute _executor;
-    private readonly IStateChanged<ReaderState> _state;
+    private readonly IState<ReaderState> _state;
     private readonly IConsumerChannelFactory<TMessage> _factory;
     private int _isDisposed;
     private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
+    public IState<ReaderState> State => _state;
 
     public SubReader(
         Guid correlationId,
@@ -41,7 +42,7 @@ public sealed class SubReader<TMessage> : IContainsChannel, 
IReader<TMessage>
         IRegisterEvent eventRegister,
         IConsumerChannel<TMessage> initialChannel,
         IExecute executor,
-        IStateChanged<ReaderState> state,
+        IState<ReaderState> state,
         IConsumerChannelFactory<TMessage> factory)
     {
         _correlationId = correlationId;
@@ -57,18 +58,6 @@ public sealed class SubReader<TMessage> : IContainsChannel, 
IReader<TMessage>
         _eventRegister.Register(new ReaderCreated(_correlationId));
     }
 
-    public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ReaderState state)
-        => _state.IsFinalState(state);
-
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
         var getLastMessageId = new CommandGetLastMessageId();
diff --git a/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs 
b/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
index dc76afd..376c455 100644
--- a/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Extensions/StateExtensionsTests.cs
@@ -182,48 +182,4 @@ public class StateExtensionsTests
         // Assert
         exception.Should().BeOfType<OperationCanceledException>();
     }
-
-    [Theory, Tests.AutoData]
-    public async Task 
DelayedStateMonitor_WhenChangingToFinalStateInitially_ShouldInvokeOnStateLeft([Frozen]
 IState<ProducerState> uut)
-    {
-        // Arrange
-        const ProducerState expected = ProducerState.Faulted;
-        uut.OnStateChangeFrom(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(expected);
-        uut.IsFinalState(expected).Returns(true);
-        ProducerState? stateLeft = null;
-        ProducerState? stateReached = null;
-        void onStateLeft(IState<ProducerState> _, ProducerState state) => 
stateLeft = state;
-        void onStateReached(IState<ProducerState> _, ProducerState state) => 
stateReached = state;
-
-        // Act
-        await uut.DelayedStateMonitor(ProducerState.Connected, 
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
-
-        // Assert
-        stateLeft.HasValue.Should().BeTrue();
-        stateLeft!.Value.Should().Be(expected);
-        stateReached.HasValue.Should().BeFalse();
-    }
-
-    [Theory, Tests.AutoData]
-    public async Task 
DelayedStateMonitor_WhenChangingToFinalStateWhileWaiting_ShouldInvokeOnStateLeft([Frozen]
 IState<ProducerState> uut)
-    {
-        // Arrange
-        const ProducerState expected = ProducerState.Disconnected;
-        uut.OnStateChangeFrom(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(expected);
-        uut.IsFinalState(ProducerState.Disconnected).Returns(false);
-        uut.OnStateChangeTo(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(x => throw new 
OperationCanceledException(), x => ProducerState.Faulted);
-        uut.IsFinalState(ProducerState.Faulted).Returns(true);
-        ProducerState? stateLeft = null;
-        ProducerState? stateReached = null;
-        void onStateLeft(IState<ProducerState> _, ProducerState state) => 
stateLeft = state;
-        void onStateReached(IState<ProducerState> _, ProducerState state) => 
stateReached = state;
-
-        // Act
-        await uut.DelayedStateMonitor(ProducerState.Connected, 
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
-
-        // Assert
-        stateLeft.HasValue.Should().BeTrue();
-        stateLeft!.Value.Should().Be(expected);
-        stateReached.HasValue.Should().BeFalse();
-    }
 }
diff --git a/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs 
b/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs
new file mode 100644
index 0000000..e36e960
--- /dev/null
+++ b/tests/DotPulsar.Tests/Extensions/StateHolderExtensionsTests.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Extensions;
+
+using AutoFixture.Xunit2;
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using System;
+
+[Trait("Category", "Unit")]
+public class StateHolderExtensionsTests
+{
+    [Theory, Tests.AutoData]
+    public async Task 
DelayedStateMonitor_WhenChangingToFinalStateInitially_ShouldInvokeOnStateLeft([Frozen]
 IStateHolder<ProducerState> uut)
+    {
+        // Arrange
+        const ProducerState expected = ProducerState.Faulted;
+        uut.State.OnStateChangeFrom(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(expected);
+        uut.State.IsFinalState(expected).Returns(true);
+        ProducerState? stateLeft = null;
+        ProducerState? stateReached = null;
+        void onStateLeft(IStateHolder<ProducerState> _, ProducerState state) 
=> stateLeft = state;
+        void onStateReached(IStateHolder<ProducerState> _, ProducerState 
state) => stateReached = state;
+
+        // Act
+        await uut.DelayedStateMonitor(ProducerState.Connected, 
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
+
+        // Assert
+        stateLeft.HasValue.Should().BeTrue();
+        stateLeft!.Value.Should().Be(expected);
+        stateReached.HasValue.Should().BeFalse();
+    }
+
+    [Theory, Tests.AutoData]
+    public async Task 
DelayedStateMonitor_WhenChangingToFinalStateWhileWaiting_ShouldInvokeOnStateLeft([Frozen]
 IStateHolder<ProducerState> uut)
+    {
+        // Arrange
+        const ProducerState expected = ProducerState.Disconnected;
+        uut.State.OnStateChangeFrom(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(expected);
+        uut.State.IsFinalState(ProducerState.Disconnected).Returns(false);
+        uut.State.OnStateChangeTo(ProducerState.Connected, 
Arg.Any<CancellationToken>()).Returns(x => throw new 
OperationCanceledException(), x => ProducerState.Faulted);
+        uut.State.IsFinalState(ProducerState.Faulted).Returns(true);
+        ProducerState? stateLeft = null;
+        ProducerState? stateReached = null;
+        void onStateLeft(IStateHolder<ProducerState> _, ProducerState state) 
=> stateLeft = state;
+        void onStateReached(IStateHolder<ProducerState> _, ProducerState 
state) => stateReached = state;
+
+        // Act
+        await uut.DelayedStateMonitor(ProducerState.Connected, 
TimeSpan.FromSeconds(1), onStateLeft, onStateReached);
+
+        // Assert
+        stateLeft.HasValue.Should().BeTrue();
+        stateLeft!.Value.Should().Be(expected);
+        stateReached.HasValue.Should().BeFalse();
+    }
+}
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 911c147..274e600 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -183,7 +183,7 @@ public sealed class ConsumerTests : IDisposable
 
         await using var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
 
-        await consumer.OnStateChangeTo(ConsumerState.Faulted, _cts.Token);
+        await consumer.State.OnStateChangeTo(ConsumerState.Faulted, 
_cts.Token);
 
         //Act
         var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
@@ -257,7 +257,7 @@ public sealed class ConsumerTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         var consumer = CreateConsumer(client, await 
_fixture.CreateTopic(_cts.Token));
-        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+        await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
 
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
@@ -277,14 +277,14 @@ public sealed class ConsumerTests : IDisposable
         await using var producer = CreateProducer(client, topicName);
         await using var consumer = CreateConsumer(client, topicName);
         await ProduceMessages(producer, 1, "test-message", _cts.Token);
-        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+        await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
 
         //Act
         await using (await _fixture.DisableThePulsarConnection())
         {
             await consumer.StateChangedTo(ConsumerState.Disconnected, 
_cts.Token);
         }
-        await consumer.OnStateChangeTo(ConsumerState.Active, _cts.Token);
+        await consumer.State.OnStateChangeTo(ConsumerState.Active, _cts.Token);
         var exception = await 
Record.ExceptionAsync(consumer.Receive(_cts.Token).AsTask);
 
         //Assert
diff --git a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs 
b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
index 42f0df9..dc4aecc 100644
--- a/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
+++ b/tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs
@@ -28,7 +28,7 @@ public class PingPongHandlerTest
         var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
 
         // Act
-        var actual = await uut.OnStateChangeTo(PingPongHandlerState.TimedOut);
+        var actual = await 
uut.State.OnStateChangeTo(PingPongHandlerState.TimedOut);
 
         // Assert
         actual.Should().Be(expected);
@@ -42,9 +42,9 @@ public class PingPongHandlerTest
         var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
 
         // Act
-        _ = await uut.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
+        _ = await 
uut.State.OnStateChangeTo(PingPongHandlerState.ThresholdExceeded);
         uut.Incoming(DotPulsar.Internal.PulsarApi.BaseCommand.Type.Ack);
-        var actual = await uut.OnStateChangeTo(PingPongHandlerState.Active);
+        var actual = await 
uut.State.OnStateChangeTo(PingPongHandlerState.Active);
 
         // Assert
         actual.Should().Be(expected);
@@ -58,7 +58,7 @@ public class PingPongHandlerTest
         var uut = new PingPongHandler(TimeSpan.FromSeconds(1));
 
         // Act
-        var actual = uut.OnStateChangeTo(PingPongHandlerState.Closed);
+        var actual = uut.State.OnStateChangeTo(PingPongHandlerState.Closed);
         await uut.DisposeAsync();
 
         // Assert
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index 39a8c89..c483d6e 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -107,14 +107,14 @@ public sealed class ProducerTests : IDisposable
         var topicName = await _fixture.CreateTopic(_cts.Token);
         await using var client = CreateClient();
         await using var producer1 = CreateProducer(client, topicName, 
accessModeForProducer1);
-        await producer1.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer1.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         //Act
         await using var producer2 = CreateProducer(client, topicName, 
accessModeForProducer2);
 
         if (accessModeForProducer2 == ProducerAccessMode.ExclusiveWithFencing) 
// We need to send a message to trigger the state change
         {
-            await producer2.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
+            await producer2.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
             try
             {
@@ -126,8 +126,8 @@ public sealed class ProducerTests : IDisposable
             }
         }
 
-        var actualStateForProducer1 = await 
producer1.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
-        var actualStateForProducer2 = await 
producer2.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
+        var actualStateForProducer1 = await 
producer1.State.OnStateChangeTo(expectedStateForProducer1, _cts.Token);
+        var actualStateForProducer2 = await 
producer2.State.OnStateChangeTo(expectedStateForProducer2, _cts.Token);
 
         //Assert
         actualStateForProducer1.Should().Be(expectedStateForProducer1);
@@ -275,7 +275,7 @@ public sealed class ProducerTests : IDisposable
         var topicName = await _fixture.CreateTopic(_cts.Token);
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, topicName);
-        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         ValueTask<MessageId> sendTask;
         await using (await _fixture.DisableThePulsarConnection())
@@ -283,7 +283,7 @@ public sealed class ProducerTests : IDisposable
             await producer.StateChangedTo(ProducerState.Disconnected, 
_cts.Token);
             sendTask = producer.Send("test", _cts.Token);
         }
-        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         //Act
         var exception = await Record.ExceptionAsync(sendTask.AsTask);
@@ -330,7 +330,7 @@ public sealed class ProducerTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
-        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
@@ -347,14 +347,14 @@ public sealed class ProducerTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         await using var producer = CreateProducer(client, await 
_fixture.CreateTopic(_cts.Token));
-        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
 
         //Act
         await using (await _fixture.DisableThePulsarConnection())
         {
             await producer.StateChangedTo(ProducerState.Disconnected, 
_cts.Token);
         }
-        await producer.OnStateChangeTo(ProducerState.Connected, _cts.Token);
+        await producer.State.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
         var exception = await Record.ExceptionAsync(producer.Send("test", 
_cts.Token).AsTask);
 
         //Assert
diff --git a/tests/DotPulsar.Tests/Internal/ReaderTests.cs 
b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
index 4b31294..7e1e3b6 100644
--- a/tests/DotPulsar.Tests/Internal/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ReaderTests.cs
@@ -203,7 +203,7 @@ public sealed class ReaderTests : IDisposable
 
         await using var reader = CreateReader(client, MessageId.Earliest, 
await _fixture.CreateTopic(_cts.Token));
 
-        await reader.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
+        await reader.State.OnStateChangeTo(ReaderState.Faulted, _cts.Token);
 
         //Act
         var exception = await 
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
@@ -277,7 +277,7 @@ public sealed class ReaderTests : IDisposable
         //Arrange
         await using var client = CreateClient();
         var reader = CreateReader(client, MessageId.Earliest, await 
_fixture.CreateTopic(_cts.Token));
-        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+        await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
 
         //Act
         await using var connectionDown = await 
_fixture.DisableThePulsarConnection();
@@ -297,14 +297,14 @@ public sealed class ReaderTests : IDisposable
         await using var producer = CreateProducer(client, topicName);
         await using var reader = CreateReader(client, MessageId.Earliest, 
topicName);
         await producer.Send("test-message", _cts.Token);
-        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+        await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
 
         //Act
         await using (await _fixture.DisableThePulsarConnection())
         {
             await reader.StateChangedTo(ReaderState.Disconnected, _cts.Token);
         }
-        await reader.OnStateChangeTo(ReaderState.Connected, _cts.Token);
+        await reader.State.OnStateChangeTo(ReaderState.Connected, _cts.Token);
         var exception = await 
Record.ExceptionAsync(reader.Receive(_cts.Token).AsTask);
 
         //Assert
diff --git a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs 
b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
index df252a1..a294066 100644
--- a/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/StateManagerTests.cs
@@ -66,7 +66,7 @@ public class StateManagerTests
     {
         //Arrange
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
-        var task = uut.StateChangedTo(newState, default);
+        var task = uut.OnStateChangeTo(newState, default);
 
         //Act
         _ = uut.SetState(newState);
@@ -84,7 +84,7 @@ public class StateManagerTests
     {
         //Arrange
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
-        var task = uut.StateChangedFrom(initialState, default);
+        var task = uut.OnStateChangeFrom(initialState, default);
 
         //Act
         _ = uut.SetState(newState);
@@ -97,13 +97,13 @@ public class StateManagerTests
     [InlineData(ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected)]
     [InlineData(ProducerState.Closed)]
-    public void 
StateChangedTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState state)
+    public void 
OnStateChangeTo_GivenStateIsAlreadyWanted_ShouldCompleteTask(ProducerState 
state)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(state, ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedTo(state, default);
+        var task = uut.OnStateChangeTo(state, default);
 
         //Assert
         Assert.True(task.IsCompleted);
@@ -114,13 +114,13 @@ public class StateManagerTests
     [InlineData(ProducerState.Connected, ProducerState.Closed)]
     [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
-    public void 
StateChangedTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState 
initialState, ProducerState wantedState)
+    public void 
OnStateChangeTo_GivenStateIsNotWanted_ShouldNotCompleteTask(ProducerState 
initialState, ProducerState wantedState)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedTo(wantedState, default);
+        var task = uut.OnStateChangeTo(wantedState, default);
 
         //Assert
         task.IsCompleted.Should().BeFalse();
@@ -129,13 +129,13 @@ public class StateManagerTests
     [Theory]
     [InlineData(ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected)]
-    public void 
StateChangedTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+    public void 
OnStateChangeTo_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(ProducerState.Closed, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedTo(state, default);
+        var task = uut.OnStateChangeTo(state, default);
 
         //Assert
         task.IsCompleted.Should().BeTrue();
@@ -144,13 +144,13 @@ public class StateManagerTests
     [Theory]
     [InlineData(ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected)]
-    public void 
StateChangedFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState 
state)
+    public void 
OnStateChangeFrom_GivenStateHasNotChanged_ShouldNotCompleteTask(ProducerState 
state)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(state, ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedFrom(state, default);
+        var task = uut.OnStateChangeFrom(state, default);
 
         //Assert
         task.IsCompleted.Should().BeFalse();
@@ -161,13 +161,13 @@ public class StateManagerTests
     [InlineData(ProducerState.Connected, ProducerState.Closed)]
     [InlineData(ProducerState.Disconnected, ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected, ProducerState.Closed)]
-    public void 
StateChangedFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState 
initialState, ProducerState fromState)
+    public void 
OnStateChangeFrom_GivenStateHasChanged_ShouldCompleteTask(ProducerState 
initialState, ProducerState fromState)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedFrom(fromState, default);
+        var task = uut.OnStateChangeFrom(fromState, default);
 
         //Assert
         task.IsCompleted.Should().BeTrue();
@@ -177,13 +177,13 @@ public class StateManagerTests
     [InlineData(ProducerState.Connected)]
     [InlineData(ProducerState.Disconnected)]
     [InlineData(ProducerState.Closed)]
-    public void 
StateChangedFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
+    public void 
OnStateChangeFrom_GivenStateIsFinal_ShouldCompleteTask(ProducerState state)
     {
         //Arrange
         var uut = new StateManager<ProducerState>(ProducerState.Closed, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedFrom(state, default);
+        var task = uut.OnStateChangeFrom(state, default);
 
         //Assert
         task.IsCompleted.Should().BeTrue();
@@ -198,7 +198,7 @@ public class StateManagerTests
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedTo(wantedState, default);
+        var task = uut.OnStateChangeTo(wantedState, default);
         _ = uut.SetState(ProducerState.Closed);
 
         //Assert
@@ -214,7 +214,7 @@ public class StateManagerTests
         var uut = new StateManager<ProducerState>(initialState, 
ProducerState.Closed);
 
         //Act
-        var task = uut.StateChangedTo(wantedState, default);
+        var task = uut.OnStateChangeTo(wantedState, default);
         _ = uut.SetState(newState);
 
         //Assert
@@ -229,7 +229,7 @@ public class StateManagerTests
         using var cts = new CancellationTokenSource();
 
         //Act
-        var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token);
+        var task = uut.OnStateChangeFrom(ProducerState.Connected, cts.Token);
         cts.Cancel();
         var exception = await Record.ExceptionAsync(() => task.AsTask()); // 
xUnit can't record ValueTask yet
 
@@ -246,7 +246,7 @@ public class StateManagerTests
 
         //Act
         cts.Cancel();
-        var task = uut.StateChangedFrom(ProducerState.Connected, cts.Token);
+        var task = uut.OnStateChangeFrom(ProducerState.Connected, cts.Token);
         var exception = await Record.ExceptionAsync(() => task.AsTask()); // 
xUnit can't record ValueTask yet
 
         //Assert
diff --git a/tests/DotPulsar.Tests/PulsarClientTests.cs 
b/tests/DotPulsar.Tests/PulsarClientTests.cs
index 77b924a..3e74ff7 100644
--- a/tests/DotPulsar.Tests/PulsarClientTests.cs
+++ b/tests/DotPulsar.Tests/PulsarClientTests.cs
@@ -42,7 +42,7 @@ public sealed class PulsarClientTests : IDisposable
 
         // Act
         var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
+        var state = await 
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
 
         // Assert
         exception.Should().BeOfType<ProducerFaultedException>();
@@ -68,7 +68,7 @@ public sealed class PulsarClientTests : IDisposable
         // Act
         _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
         throwException = true;
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
+        var state = await 
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
 
         // Assert
         state.Should().Be(ProducerState.Faulted);
@@ -88,7 +88,7 @@ public sealed class PulsarClientTests : IDisposable
 
         // Act
         var exception = await Record.ExceptionAsync(() => 
producer.Send("Test", _cts.Token).AsTask());
-        var state = await producer.OnStateChangeTo(ProducerState.Faulted, 
_cts.Token);
+        var state = await 
producer.State.OnStateChangeTo(ProducerState.Faulted, _cts.Token);
 
         // Assert
         exception.Should().BeOfType<ProducerFaultedException>();
@@ -119,7 +119,7 @@ public sealed class PulsarClientTests : IDisposable
         // Act
         _ = await producer.Send("Test", _cts.Token); // Make sure we have a 
working connection
         await tcs.Task;
-        var state = await producer.OnStateChangeTo(ProducerState.Connected, 
_cts.Token);
+        var state = await 
producer.State.OnStateChangeTo(ProducerState.Connected, _cts.Token);
 
         // Assert
         state.Should().Be(ProducerState.Connected);

Reply via email to