This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d741d47 When monitoring state we now return a class giving us both
the new state and the Consumer/Producer/Reader.
d741d47 is described below
commit d741d477a176b99040cc0679511fc14aae19a1ed
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Apr 2 23:54:24 2020 +0200
When monitoring state we now return a class giving us both the new state
and the Consumer/Producer/Reader.
---
samples/Consuming/Program.cs | 22 ++++++------
samples/Producing/Program.cs | 18 +++++-----
samples/Reading/Program.cs | 20 ++++++-----
src/DotPulsar/Abstractions/IConsumer.cs | 40 ++++++++++++++++++++-
src/DotPulsar/Abstractions/IProducer.cs | 40 ++++++++++++++++++++-
src/DotPulsar/Abstractions/IReader.cs | 41 +++++++++++++++++++++-
.../IReader.cs => ConsumerStateChanged.cs} | 26 ++++++--------
.../{ => Internal}/Abstractions/IStateChanged.cs | 4 +--
src/DotPulsar/Internal/Consumer.cs | 14 +++++---
src/DotPulsar/Internal/Producer.cs | 14 +++++---
src/DotPulsar/Internal/Reader.cs | 14 +++++---
.../IReader.cs => ProducerStateChanged.cs} | 26 ++++++--------
.../IReader.cs => ReaderStateChanged.cs} | 26 ++++++--------
13 files changed, 213 insertions(+), 92 deletions(-)
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 59fe108..36f8497 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -79,20 +79,22 @@ namespace Consuming
while (true)
{
- state = await
consumer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await
consumer.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ConsumerState;
var stateMessage = state switch
{
- ConsumerState.Active => "The consumer is active",
- ConsumerState.Inactive => "The consumer is inactive",
- ConsumerState.Disconnected => "The consumer is
disconnected",
- ConsumerState.Closed => "The consumer has closed",
- ConsumerState.ReachedEndOfTopic => "The consumer has
reached end of topic",
- ConsumerState.Faulted => "The consumer has faulted",
- _ => $"The consumer has an unknown state '{state}'"
+ ConsumerState.Active => "is active",
+ ConsumerState.Inactive => "is inactive",
+ ConsumerState.Disconnected => "is disconnected",
+ ConsumerState.Closed => "has closed",
+ ConsumerState.ReachedEndOfTopic => "has reached end of
topic",
+ ConsumerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Consumer.Topic;
+ Console.WriteLine($"The consumer for topic '{topic}' " +
stateMessage);
if (consumer.IsFinalState(state))
return;
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 73ddff8..664793f 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -78,18 +78,20 @@ namespace Producing
while (true)
{
- state = await
producer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await
producer.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ProducerState;
var stateMessage = state switch
{
- ProducerState.Connected => "The producer is connected",
- ProducerState.Disconnected => "The producer is
disconnected",
- ProducerState.Closed => "The producer has closed",
- ProducerState.Faulted => "The producer has faulted",
- _ => $"The producer has an unknown state '{state}'"
+ ProducerState.Connected => "is connected",
+ ProducerState.Disconnected => "is disconnected",
+ ProducerState.Closed => "has closed",
+ ProducerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Producer.Topic;
+ Console.WriteLine($"The producer for topic '{topic}' " +
stateMessage);
if (producer.IsFinalState(state))
return;
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b42275d..d7c9f6c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -78,19 +78,21 @@ namespace Reading
while (true)
{
- state = await
reader.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await
reader.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ReaderState;
var stateMessage = state switch
{
- ReaderState.Connected => "The reader is connected",
- ReaderState.Disconnected => "The reader is disconnected",
- ReaderState.Closed => "The reader has closed",
- ReaderState.ReachedEndOfTopic => "The reader has reached
end of topic",
- ReaderState.Faulted => "The reader has faulted",
- _ => $"The reader has an unknown state '{state}'"
+ ReaderState.Connected => "is connected",
+ ReaderState.Disconnected => "is disconnected",
+ ReaderState.Closed => "has closed",
+ ReaderState.ReachedEndOfTopic => "has reached end of
topic",
+ ReaderState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Reader.Topic;
+ Console.WriteLine($"The reader for topic '{topic}' " +
stateMessage);
if (reader.IsFinalState(state))
return;
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs
b/src/DotPulsar/Abstractions/IConsumer.cs
index 4d33ab7..70bfb99 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@ namespace DotPulsar.Abstractions
/// <summary>
/// A consumer abstraction.
/// </summary>
- public interface IConsumer : IStateChanged<ConsumerState>, IAsyncDisposable
+ public interface IConsumer : IAsyncDisposable
{
/// <summary>
/// Acknowledge the consumption of a single message.
@@ -50,6 +50,22 @@ namespace DotPulsar.Abstractions
ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default);
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never
change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will
never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ConsumerState state);
+
+ /// <summary>
/// Get an IAsyncEnumerable for consuming messages
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
@@ -60,6 +76,28 @@ namespace DotPulsar.Abstractions
ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the consumer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs
b/src/DotPulsar/Abstractions/IProducer.cs
index 11e46d1..bd37d46 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,9 +22,25 @@ namespace DotPulsar.Abstractions
/// <summary>
/// A producer abstraction.
/// </summary>
- public interface IProducer : IStateChanged<ProducerState>, IAsyncDisposable
+ public interface IProducer : IAsyncDisposable
{
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never
change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will
never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ProducerState state);
+
+ /// <summary>
/// Sends a message.
/// </summary>
ValueTask<MessageId> Send(byte[] data, CancellationToken
cancellationToken = default);
@@ -55,6 +71,28 @@ namespace DotPulsar.Abstractions
ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the producer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/Abstractions/IReader.cs
index 091551c..81bc9b4 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,18 +17,57 @@ namespace DotPulsar.Abstractions
using System;
using System.Collections.Generic;
using System.Threading;
+ using System.Threading.Tasks;
/// <summary>
/// A reader abstraction.
/// </summary>
- public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+ public interface IReader : IAsyncDisposable
{
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never
change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will
never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ReaderState state);
+
+ /// <summary>
/// Get an IAsyncEnumerable for reading messages
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state,
CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the reader.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/ConsumerStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ConsumerStateChanged.cs
index 091551c..1f0a80e 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ConsumerStateChanged.cs
@@ -12,25 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar
{
- using System;
- using System.Collections.Generic;
- using System.Threading;
+ using DotPulsar.Abstractions;
- /// <summary>
- /// A reader abstraction.
- /// </summary>
- public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+ public sealed class ConsumerStateChanged
{
- /// <summary>
- /// Get an IAsyncEnumerable for reading messages
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
+ internal ConsumerStateChanged(IConsumer consumer, ConsumerState
consumerState)
+ {
+ Consumer = consumer;
+ ConsumerState = consumerState;
+ }
- /// <summary>
- /// The topic of the reader.
- /// </summary>
- string Topic { get; }
+ public IConsumer Consumer { get; }
+ public ConsumerState ConsumerState { get; }
}
}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs
b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
similarity index 97%
rename from src/DotPulsar/Abstractions/IStateChanged.cs
rename to src/DotPulsar/Internal/Abstractions/IStateChanged.cs
index 079b9be..732f13e 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal.Abstractions
{
using System.Threading;
using System.Threading.Tasks;
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 9fbdc37..2208606 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -57,11 +57,17 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
}
- public async ValueTask<ConsumerState> StateChangedTo(ConsumerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ConsumerStateChanged>
StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(this, newState);
+ }
- public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ConsumerStateChanged>
StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index d9e5dd0..28e0fe2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -53,11 +53,17 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ProducerCreated(_correlationId, this));
}
- public ValueTask<ProducerState> StateChangedTo(ProducerState state,
CancellationToken cancellationToken)
- => _state.StateChangedTo(state, cancellationToken);
+ public async ValueTask<ProducerStateChanged>
StateChangedTo(ProducerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(this, newState);
+ }
- public ValueTask<ProducerState> StateChangedFrom(ProducerState state,
CancellationToken cancellationToken)
- => _state.StateChangedFrom(state, cancellationToken);
+ public async ValueTask<ProducerStateChanged>
StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 5b7d740..bb659e7 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -54,11 +54,17 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ReaderCreated(_correlationId, this));
}
- public async ValueTask<ReaderState> StateChangedTo(ReaderState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState
state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(this, newState);
+ }
- public async ValueTask<ReaderState> StateChangedFrom(ReaderState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ReaderStateChanged>
StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/ProducerStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ProducerStateChanged.cs
index 091551c..f866954 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ProducerStateChanged.cs
@@ -12,25 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar
{
- using System;
- using System.Collections.Generic;
- using System.Threading;
+ using DotPulsar.Abstractions;
- /// <summary>
- /// A reader abstraction.
- /// </summary>
- public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+ public sealed class ProducerStateChanged
{
- /// <summary>
- /// Get an IAsyncEnumerable for reading messages
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
+ internal ProducerStateChanged(IProducer producer, ProducerState
producerState)
+ {
+ Producer = producer;
+ ProducerState = producerState;
+ }
- /// <summary>
- /// The topic of the reader.
- /// </summary>
- string Topic { get; }
+ public IProducer Producer { get; }
+ public ProducerState ProducerState { get; }
}
}
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/ReaderStateChanged.cs
similarity index 50%
copy from src/DotPulsar/Abstractions/IReader.cs
copy to src/DotPulsar/ReaderStateChanged.cs
index 091551c..4133abd 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/ReaderStateChanged.cs
@@ -12,25 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar
{
- using System;
- using System.Collections.Generic;
- using System.Threading;
+ using DotPulsar.Abstractions;
- /// <summary>
- /// A reader abstraction.
- /// </summary>
- public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+ public sealed class ReaderStateChanged
{
- /// <summary>
- /// Get an IAsyncEnumerable for reading messages
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
+ internal ReaderStateChanged(IReader reader, ReaderState readerState)
+ {
+ Reader = reader;
+ ReaderState = readerState;
+ }
- /// <summary>
- /// The topic of the reader.
- /// </summary>
- string Topic { get; }
+ public IReader Reader { get; }
+ public ReaderState ReaderState { get; }
}
}