This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d741d47  When monitoring state we now return a class giving us both 
the new state and the Consumer/Producer/Reader.
d741d47 is described below

commit d741d477a176b99040cc0679511fc14aae19a1ed
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Apr 2 23:54:24 2020 +0200

    When monitoring state we now return a class giving us both the new state 
and the Consumer/Producer/Reader.
---
 samples/Consuming/Program.cs                       | 22 ++++++------
 samples/Producing/Program.cs                       | 18 +++++-----
 samples/Reading/Program.cs                         | 20 ++++++-----
 src/DotPulsar/Abstractions/IConsumer.cs            | 40 ++++++++++++++++++++-
 src/DotPulsar/Abstractions/IProducer.cs            | 40 ++++++++++++++++++++-
 src/DotPulsar/Abstractions/IReader.cs              | 41 +++++++++++++++++++++-
 .../IReader.cs => ConsumerStateChanged.cs}         | 26 ++++++--------
 .../{ => Internal}/Abstractions/IStateChanged.cs   |  4 +--
 src/DotPulsar/Internal/Consumer.cs                 | 14 +++++---
 src/DotPulsar/Internal/Producer.cs                 | 14 +++++---
 src/DotPulsar/Internal/Reader.cs                   | 14 +++++---
 .../IReader.cs => ProducerStateChanged.cs}         | 26 ++++++--------
 .../IReader.cs => ReaderStateChanged.cs}           | 26 ++++++--------
 13 files changed, 213 insertions(+), 92 deletions(-)

diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 59fe108..36f8497 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -79,20 +79,22 @@ namespace Consuming
 
             while (true)
             {
-                state = await 
consumer.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await 
consumer.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ConsumerState;
 
                 var stateMessage = state switch
                 {
-                    ConsumerState.Active => "The consumer is active",
-                    ConsumerState.Inactive => "The consumer is inactive",
-                    ConsumerState.Disconnected => "The consumer is 
disconnected",
-                    ConsumerState.Closed => "The consumer has closed",
-                    ConsumerState.ReachedEndOfTopic => "The consumer has 
reached end of topic",
-                    ConsumerState.Faulted => "The consumer has faulted",
-                    _ => $"The consumer has an unknown state '{state}'"
+                    ConsumerState.Active => "is active",
+                    ConsumerState.Inactive => "is inactive",
+                    ConsumerState.Disconnected => "is disconnected",
+                    ConsumerState.Closed => "has closed",
+                    ConsumerState.ReachedEndOfTopic => "has reached end of 
topic",
+                    ConsumerState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Consumer.Topic;
+                Console.WriteLine($"The consumer for topic '{topic}' " + 
stateMessage);
 
                 if (consumer.IsFinalState(state))
                     return;
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 73ddff8..664793f 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -78,18 +78,20 @@ namespace Producing
 
             while (true)
             {
-                state = await 
producer.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await 
producer.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ProducerState;
 
                 var stateMessage = state switch
                 {
-                    ProducerState.Connected => "The producer is connected",
-                    ProducerState.Disconnected => "The producer is 
disconnected",
-                    ProducerState.Closed => "The producer has closed",
-                    ProducerState.Faulted => "The producer has faulted",
-                    _ => $"The producer has an unknown state '{state}'"
+                    ProducerState.Connected => "is connected",
+                    ProducerState.Disconnected => "is disconnected",
+                    ProducerState.Closed => "has closed",
+                    ProducerState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Producer.Topic;
+                Console.WriteLine($"The producer for topic '{topic}' " + 
stateMessage);
 
                 if (producer.IsFinalState(state))
                     return;
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b42275d..d7c9f6c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -78,19 +78,21 @@ namespace Reading
 
             while (true)
             {
-                state = await 
reader.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await 
reader.StateChangedFrom(state).ConfigureAwait(false);
+                state = stateChanged.ReaderState;
 
                 var stateMessage = state switch
                 {
-                    ReaderState.Connected => "The reader is connected",
-                    ReaderState.Disconnected => "The reader is disconnected",
-                    ReaderState.Closed => "The reader has closed",
-                    ReaderState.ReachedEndOfTopic => "The reader has reached 
end of topic",
-                    ReaderState.Faulted => "The reader has faulted",
-                    _ => $"The reader has an unknown state '{state}'"
+                    ReaderState.Connected => "is connected",
+                    ReaderState.Disconnected => "is disconnected",
+                    ReaderState.Closed => "has closed",
+                    ReaderState.ReachedEndOfTopic => "has reached end of 
topic",
+                    ReaderState.Faulted => "has faulted",
+                    _ => $"has an unknown state '{state}'"
                 };
 
-                Console.WriteLine(stateMessage);
+                var topic = stateChanged.Reader.Topic;
+                Console.WriteLine($"The reader for topic '{topic}' " + 
stateMessage);
 
                 if (reader.IsFinalState(state))
                     return;
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs 
b/src/DotPulsar/Abstractions/IConsumer.cs
index 4d33ab7..70bfb99 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@ namespace DotPulsar.Abstractions
     /// <summary>
     /// A consumer abstraction.
     /// </summary>
-    public interface IConsumer : IStateChanged<ConsumerState>, IAsyncDisposable
+    public interface IConsumer : IAsyncDisposable
     {
         /// <summary>
         /// Acknowledge the consumption of a single message.
@@ -50,6 +50,22 @@ namespace DotPulsar.Abstractions
         ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken = default);
 
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never 
change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will 
never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ConsumerState state);
+
+        /// <summary>
         /// Get an IAsyncEnumerable for consuming messages
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
@@ -60,6 +76,28 @@ namespace DotPulsar.Abstractions
         ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken = default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the consumer.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs 
b/src/DotPulsar/Abstractions/IProducer.cs
index 11e46d1..bd37d46 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,9 +22,25 @@ namespace DotPulsar.Abstractions
     /// <summary>
     /// A producer abstraction.
     /// </summary>
-    public interface IProducer : IStateChanged<ProducerState>, IAsyncDisposable
+    public interface IProducer : IAsyncDisposable
     {
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never 
change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will 
never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ProducerState state);
+
+        /// <summary>
         /// Sends a message.
         /// </summary>
         ValueTask<MessageId> Send(byte[] data, CancellationToken 
cancellationToken = default);
@@ -55,6 +71,28 @@ namespace DotPulsar.Abstractions
         ValueTask<MessageId> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the producer.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/Abstractions/IReader.cs
index 091551c..81bc9b4 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,18 +17,57 @@ namespace DotPulsar.Abstractions
     using System;
     using System.Collections.Generic;
     using System.Threading;
+    using System.Threading.Tasks;
 
     /// <summary>
     /// A reader abstraction.
     /// </summary>
-    public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+    public interface IReader : IAsyncDisposable
     {
         /// <summary>
+        /// Ask whether the current state is final, meaning that it will never 
change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState();
+
+        /// <summary>
+        /// Ask whether the provided state is final, meaning that it will 
never change.
+        /// </summary>
+        /// <returns>
+        /// True if it's final and False if it's not.
+        /// </returns>
+        bool IsFinalState(ReaderState state);
+
+        /// <summary>
         /// Get an IAsyncEnumerable for reading messages
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
 
         /// <summary>
+        /// Wait for the state to change to a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Wait for the state to change from a specific state.
+        /// </summary>
+        /// <returns>
+        /// The current state.
+        /// </returns>
+        /// <remarks>
+        /// If the state change to a final state, then all awaiting tasks will 
complete.
+        /// </remarks>
+        ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, 
CancellationToken cancellationToken = default);
+
+        /// <summary>
         /// The topic of the reader.
         /// </summary>
         string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/ConsumerStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ConsumerStateChanged.cs
index 091551c..1f0a80e 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ConsumerStateChanged.cs
@@ -12,25 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar
 {
-    using System;
-    using System.Collections.Generic;
-    using System.Threading;
+    using DotPulsar.Abstractions;
 
-    /// <summary>
-    /// A reader abstraction.
-    /// </summary>
-    public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+    public sealed class ConsumerStateChanged
     {
-        /// <summary>
-        /// Get an IAsyncEnumerable for reading messages
-        /// </summary>
-        IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
+        internal ConsumerStateChanged(IConsumer consumer, ConsumerState 
consumerState)
+        {
+            Consumer = consumer;
+            ConsumerState = consumerState;
+        }
 
-        /// <summary>
-        /// The topic of the reader.
-        /// </summary>
-        string Topic { get; }
+        public IConsumer Consumer { get; }
+        public ConsumerState ConsumerState { get; }
     }
 }
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs 
b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
similarity index 97%
rename from src/DotPulsar/Abstractions/IStateChanged.cs
rename to src/DotPulsar/Internal/Abstractions/IStateChanged.cs
index 079b9be..732f13e 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,7 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal.Abstractions
 {
     using System.Threading;
     using System.Threading.Tasks;
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 9fbdc37..2208606 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -57,11 +57,17 @@ namespace DotPulsar.Internal
             _eventRegister.Register(new ConsumerCreated(_correlationId, this));
         }
 
-        public async ValueTask<ConsumerState> StateChangedTo(ConsumerState 
state, CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ConsumerStateChanged> 
StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(this, newState);
+        }
 
-        public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState 
state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ConsumerStateChanged> 
StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+            return new ConsumerStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index d9e5dd0..28e0fe2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -53,11 +53,17 @@ namespace DotPulsar.Internal
             _eventRegister.Register(new ProducerCreated(_correlationId, this));
         }
 
-        public ValueTask<ProducerState> StateChangedTo(ProducerState state, 
CancellationToken cancellationToken)
-            => _state.StateChangedTo(state, cancellationToken);
+        public async ValueTask<ProducerStateChanged> 
StateChangedTo(ProducerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(this, newState);
+        }
 
-        public ValueTask<ProducerState> StateChangedFrom(ProducerState state, 
CancellationToken cancellationToken)
-            => _state.StateChangedFrom(state, cancellationToken);
+        public async ValueTask<ProducerStateChanged> 
StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+            return new ProducerStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 5b7d740..bb659e7 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -54,11 +54,17 @@ namespace DotPulsar.Internal
             _eventRegister.Register(new ReaderCreated(_correlationId, this));
         }
 
-        public async ValueTask<ReaderState> StateChangedTo(ReaderState state, 
CancellationToken cancellationToken)
-            => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState 
state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(this, newState);
+        }
 
-        public async ValueTask<ReaderState> StateChangedFrom(ReaderState 
state, CancellationToken cancellationToken)
-            => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+        public async ValueTask<ReaderStateChanged> 
StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
+        {
+            var newState = await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+            return new ReaderStateChanged(this, newState);
+        }
 
         public bool IsFinalState()
             => _state.IsFinalState();
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/ProducerStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ProducerStateChanged.cs
index 091551c..f866954 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ProducerStateChanged.cs
@@ -12,25 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar
 {
-    using System;
-    using System.Collections.Generic;
-    using System.Threading;
+    using DotPulsar.Abstractions;
 
-    /// <summary>
-    /// A reader abstraction.
-    /// </summary>
-    public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+    public sealed class ProducerStateChanged
     {
-        /// <summary>
-        /// Get an IAsyncEnumerable for reading messages
-        /// </summary>
-        IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
+        internal ProducerStateChanged(IProducer producer, ProducerState 
producerState)
+        {
+            Producer = producer;
+            ProducerState = producerState;
+        }
 
-        /// <summary>
-        /// The topic of the reader.
-        /// </summary>
-        string Topic { get; }
+        public IProducer Producer { get; }
+        public ProducerState ProducerState { get; }
     }
 }
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/ReaderStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ReaderStateChanged.cs
index 091551c..4133abd 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ReaderStateChanged.cs
@@ -12,25 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar
 {
-    using System;
-    using System.Collections.Generic;
-    using System.Threading;
+    using DotPulsar.Abstractions;
 
-    /// <summary>
-    /// A reader abstraction.
-    /// </summary>
-    public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+    public sealed class ReaderStateChanged
     {
-        /// <summary>
-        /// Get an IAsyncEnumerable for reading messages
-        /// </summary>
-        IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
+        internal ReaderStateChanged(IReader reader, ReaderState readerState)
+        {
+            Reader = reader;
+            ReaderState = readerState;
+        }
 
-        /// <summary>
-        /// The topic of the reader.
-        /// </summary>
-        string Topic { get; }
+        public IReader Reader { get; }
+        public ReaderState ReaderState { get; }
     }
 }

Reply via email to