This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2bd933e Refactoring and moving that can be moved to extension methods.
2bd933e is described below
commit 2bd933ea42607e5ce5fe448db530584287a01bd0
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Feb 12 08:58:01 2021 +0100
Refactoring and moving that can be moved to extension methods.
---
src/DotPulsar/Abstractions/IConsumer.cs | 84 +---------------------
.../IGetLastMessageId.cs} | 17 ++---
src/DotPulsar/Abstractions/IHandleStateChanged.cs | 2 +-
src/DotPulsar/Abstractions/IProducer.cs | 73 +------------------
src/DotPulsar/Abstractions/IReader.cs | 73 +------------------
.../IReceive.cs} | 12 +++-
.../{IHandleStateChanged.cs => ISeek.cs} | 12 ++--
.../{IHandleStateChanged.cs => ISend.cs} | 13 ++--
.../Abstractions/{IProducer.cs => IState.cs} | 47 ++----------
src/DotPulsar/Extensions/ConsumerExtensions.cs | 68 ++++++++++++++++++
src/DotPulsar/Extensions/ProducerExtensions.cs | 35 +++++++++
src/DotPulsar/Extensions/PulsarClientExtensions.cs | 3 +
src/DotPulsar/Extensions/ReaderExtensions.cs | 56 +++++++++++++++
.../ReceiveExtensions.cs} | 23 +++---
src/DotPulsar/Extensions/SeekExtensions.cs | 39 ++++++++++
src/DotPulsar/Extensions/SendExtensions.cs | 52 ++++++++++++++
.../Internal/Abstractions/IConsumerChannel.cs | 6 +-
src/DotPulsar/Internal/Consumer.cs | 76 ++++++--------------
src/DotPulsar/Internal/ConsumerChannel.cs | 14 ++--
.../MessageIdDataExtensions.cs} | 21 +++---
src/DotPulsar/Internal/MessageBuilder.cs | 1 +
src/DotPulsar/Internal/MonitorState.cs | 1 +
src/DotPulsar/Internal/NotReadyChannel.cs | 8 +--
src/DotPulsar/Internal/Producer.cs | 29 ++------
src/DotPulsar/Internal/Reader.cs | 58 ++++-----------
src/DotPulsar/Internal/ReaderChannelFactory.cs | 8 +--
src/DotPulsar/Internal/ReaderProcess.cs | 4 +-
src/DotPulsar/MessageId.cs | 34 ++++-----
28 files changed, 401 insertions(+), 468 deletions(-)
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs
b/src/DotPulsar/Abstractions/IConsumer.cs
index 48fe671..7fb4e16 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,97 +22,19 @@ namespace DotPulsar.Abstractions
/// <summary>
/// A consumer abstraction.
/// </summary>
- public interface IConsumer : IAsyncDisposable
+ public interface IConsumer : IGetLastMessageId, IReceive, ISeek,
IState<ConsumerState>, IAsyncDisposable
{
/// <summary>
- /// Acknowledge the consumption of a single message.
- /// </summary>
- ValueTask Acknowledge(Message message, CancellationToken
cancellationToken = default);
-
- /// <summary>
/// Acknowledge the consumption of a single message using the
MessageId.
/// </summary>
ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken = default);
/// <summary>
- /// Acknowledge the consumption of all the messages in the topic up to
and including the provided message.
- /// </summary>
- ValueTask AcknowledgeCumulative(Message message, CancellationToken
cancellationToken = default);
-
- /// <summary>
/// Acknowledge the consumption of all the messages in the topic up to
and including the provided MessageId.
/// </summary>
ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken
cancellationToken = default);
/// <summary>
- /// Get the MessageId of the last message on the topic.
- /// </summary>
- ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Ask whether the current state is final, meaning that it will never
change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will
never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ConsumerState state);
-
- /// <summary>
- /// Get an IAsyncEnumerable for consuming messages.
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific
MessageId.
- /// </summary>
- ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific
message publish time using unix time in milliseconds.
- /// </summary>
- ValueTask Seek(ulong publishTime, CancellationToken cancellationToken
= default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific
message publish time using an UTC DateTime.
- /// </summary>
- ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific
message publish time using a DateTimeOffset.
- /// </summary>
- ValueTask Seek(DateTimeOffset publishTime, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the consumer.
/// </summary>
string Topic { get; }
@@ -125,11 +47,11 @@ namespace DotPulsar.Abstractions
/// <summary>
/// Redeliver the pending messages that were pushed to this consumer
that are not yet acknowledged.
/// </summary>
- ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId>
messageIds, CancellationToken cancellationToken);
+ ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId>
messageIds, CancellationToken cancellationToken = default);
/// <summary>
/// Redeliver all pending messages that were pushed to this consumer
that are not yet acknowledged.
/// </summary>
- ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken);
+ ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
similarity index 55%
copy from src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
copy to src/DotPulsar/Abstractions/IGetLastMessageId.cs
index 55135ea..0c3059a 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
@@ -12,18 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
{
- using DotPulsar.Internal.PulsarApi;
- using System;
using System.Threading;
using System.Threading.Tasks;
- public interface IReaderChannel : IAsyncDisposable
+ /// <summary>
+ /// An abstraction for getting the last message id.
+ /// </summary>
+ public interface IGetLastMessageId
{
- Task<CommandSuccess> Send(CommandSeek command, CancellationToken
cancellationToken);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId
command, CancellationToken cancellationToken);
- ValueTask<Message> Receive(CancellationToken cancellationToken);
- ValueTask ClosedByClient(CancellationToken cancellationToken);
+ /// <summary>
+ /// Get the MessageId of the last message on the topic.
+ /// </summary>
+ ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
index 04d6b60..8be3a99 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
@@ -25,7 +25,7 @@ namespace DotPulsar.Abstractions
/// <summary>
/// Called after a state has changed.
/// </summary>
- ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken
cancellationToken);
+ ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken
cancellationToken = default);
/// <summary>
/// The cancellation token to use when waiting for and handling state
changes.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs
b/src/DotPulsar/Abstractions/IProducer.cs
index 03ff03e..27c59ee 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -15,84 +15,13 @@
namespace DotPulsar.Abstractions
{
using System;
- using System.Buffers;
- using System.Threading;
- using System.Threading.Tasks;
/// <summary>
/// A producer abstraction.
/// </summary>
- public interface IProducer : IAsyncDisposable
+ public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
{
/// <summary>
- /// Ask whether the current state is final, meaning that it will never
change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will
never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ProducerState state);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(byte[] data, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlySequence<byte> data,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the producer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/Abstractions/IReader.cs
index 1023419..6b4207b 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -15,84 +15,13 @@
namespace DotPulsar.Abstractions
{
using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
/// <summary>
/// A reader abstraction.
/// </summary>
- public interface IReader : IAsyncDisposable
+ public interface IReader : IGetLastMessageId, IReceive, ISeek,
IState<ReaderState>, IAsyncDisposable
{
/// <summary>
- /// Ask whether the current state is final, meaning that it will never
change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will
never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ReaderState state);
-
- /// <summary>
- /// Get the MessageId of the last message on the topic.
- /// </summary>
- ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Get an IAsyncEnumerable for reading messages
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific
MessageId.
- /// </summary>
- ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific
message publish time using unix time in milliseconds.
- /// </summary>
- ValueTask Seek(ulong publishTime, CancellationToken cancellationToken
= default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific
message publish time using an UTC DateTime.
- /// </summary>
- ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific
message publish time using a DateTimeOffset.
- /// </summary>
- ValueTask Seek(DateTimeOffset publishTime, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will
complete.
- /// </remarks>
- ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the reader.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
b/src/DotPulsar/Abstractions/IReceive.cs
similarity index 66%
rename from src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
rename to src/DotPulsar/Abstractions/IReceive.cs
index c651dbf..2ff275c 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Abstractions/IReceive.cs
@@ -12,13 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
{
using System.Threading;
using System.Threading.Tasks;
- public interface IReaderChannelFactory
+ /// <summary>
+ /// An abstraction for receiving a single message.
+ /// </summary>
+ public interface IReceive
{
- Task<IReaderChannel> Create(CancellationToken cancellationToken =
default);
+ /// <summary>
+ /// Receive a single message.
+ /// </summary>
+ ValueTask<Message> Receive(CancellationToken cancellationToken =
default);
}
}
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
b/src/DotPulsar/Abstractions/ISeek.cs
similarity index 62%
copy from src/DotPulsar/Abstractions/IHandleStateChanged.cs
copy to src/DotPulsar/Abstractions/ISeek.cs
index 04d6b60..373c619 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Abstractions/ISeek.cs
@@ -18,18 +18,18 @@ namespace DotPulsar.Abstractions
using System.Threading.Tasks;
/// <summary>
- /// An state change handling abstraction.
+ /// An abstraction for seeking.
/// </summary>
- public interface IHandleStateChanged<TStateChanged>
+ public interface ISeek
{
/// <summary>
- /// Called after a state has changed.
+ /// Reset the cursor associated with the consumer or reader to a
specific MessageId.
/// </summary>
- ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken
cancellationToken);
+ ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default);
/// <summary>
- /// The cancellation token to use when waiting for and handling state
changes.
+ /// Reset the cursor associated with the consumer or reader to a
specific message publish time using unix time in milliseconds.
/// </summary>
- CancellationToken CancellationToken { get; }
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken
= default);
}
}
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
b/src/DotPulsar/Abstractions/ISend.cs
similarity index 65%
copy from src/DotPulsar/Abstractions/IHandleStateChanged.cs
copy to src/DotPulsar/Abstractions/ISend.cs
index 04d6b60..2948c34 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -14,22 +14,23 @@
namespace DotPulsar.Abstractions
{
+ using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
- /// An state change handling abstraction.
+ /// An abstraction for sending a message.
/// </summary>
- public interface IHandleStateChanged<TStateChanged>
+ public interface ISend
{
/// <summary>
- /// Called after a state has changed.
+ /// Sends a message.
/// </summary>
- ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken
cancellationToken);
+ ValueTask<MessageId> Send(ReadOnlySequence<byte> data,
CancellationToken cancellationToken = default);
/// <summary>
- /// The cancellation token to use when waiting for and handling state
changes.
+ /// Sends a message with metadata.
/// </summary>
- CancellationToken CancellationToken { get; }
+ ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IProducer.cs
b/src/DotPulsar/Abstractions/IState.cs
similarity index 51%
copy from src/DotPulsar/Abstractions/IProducer.cs
copy to src/DotPulsar/Abstractions/IState.cs
index 03ff03e..48a975e 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IState.cs
@@ -14,15 +14,13 @@
namespace DotPulsar.Abstractions
{
- using System;
- using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
- /// A producer abstraction.
+ /// A state change monitoring abstraction.
/// </summary>
- public interface IProducer : IAsyncDisposable
+ public interface IState<TState> where TState : notnull
{
/// <summary>
/// Ask whether the current state is final, meaning that it will never
change.
@@ -38,37 +36,7 @@ namespace DotPulsar.Abstractions
/// <returns>
/// True if it's final and False if it's not.
/// </returns>
- bool IsFinalState(ProducerState state);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(byte[] data, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken
cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlySequence<byte> data,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+ bool IsFinalState(TState state);
/// <summary>
/// Wait for the state to change to a specific state.
@@ -79,7 +47,7 @@ namespace DotPulsar.Abstractions
/// <remarks>
/// If the state change to a final state, then all awaiting tasks will
complete.
/// </remarks>
- ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state,
CancellationToken cancellationToken = default);
+ ValueTask<TState> OnStateChangeTo(TState state, CancellationToken
cancellationToken = default);
/// <summary>
/// Wait for the state to change from a specific state.
@@ -90,11 +58,6 @@ namespace DotPulsar.Abstractions
/// <remarks>
/// If the state change to a final state, then all awaiting tasks will
complete.
/// </remarks>
- ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state,
CancellationToken cancellationToken = default);
-
- /// <summary>
- /// The topic of the producer.
- /// </summary>
- string Topic { get; }
+ ValueTask<TState> OnStateChangeFrom(TState state, CancellationToken
cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
new file mode 100644
index 0000000..b61971c
--- /dev/null
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for IConsumer.
+ /// </summary>
+ public static class ConsumerExtensions
+ {
+ /// <summary>
+ /// Acknowledge the consumption of a single message.
+ /// </summary>
+ public static async ValueTask Acknowledge(this IConsumer consumer,
Message message, CancellationToken cancellationToken = default)
+ => await consumer.Acknowledge(message.MessageId,
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Acknowledge the consumption of all the messages in the topic up to
and including the provided message.
+ /// </summary>
+ public static async ValueTask AcknowledgeCumulative(this IConsumer
consumer, Message message, CancellationToken cancellationToken = default)
+ => await consumer.AcknowledgeCumulative(message.MessageId,
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ConsumerStateChanged>
StateChangedTo(this IConsumer consumer, ConsumerState state, CancellationToken
cancellationToken = default)
+ {
+ var newState = await consumer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(consumer, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ConsumerStateChanged>
StateChangedFrom(this IConsumer consumer, ConsumerState state,
CancellationToken cancellationToken = default)
+ {
+ var newState = await consumer.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(consumer, newState);
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs
b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 9c3007c..8f02262 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -16,7 +16,12 @@ namespace DotPulsar.Extensions
{
using Abstractions;
using Internal;
+ using System.Threading;
+ using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IProducer.
+ /// </summary>
public static class ProducerExtensions
{
/// <summary>
@@ -24,5 +29,35 @@ namespace DotPulsar.Extensions
/// </summary>
public static IMessageBuilder NewMessage(this IProducer producer)
=> new MessageBuilder(producer);
+
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ProducerStateChanged>
StateChangedTo(this IProducer producer, ProducerState state, CancellationToken
cancellationToken = default)
+ {
+ var newState = await producer.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(producer, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ProducerStateChanged>
StateChangedFrom(this IProducer producer, ProducerState state,
CancellationToken cancellationToken = default)
+ {
+ var newState = await producer.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(producer, newState);
+ }
}
}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 77e95aa..574bce1 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -17,6 +17,9 @@ namespace DotPulsar.Extensions
using Abstractions;
using Internal;
+ /// <summary>
+ /// Extensions for IPulsarClient.
+ /// </summary>
public static class PulsarClientExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/ReaderExtensions.cs
b/src/DotPulsar/Extensions/ReaderExtensions.cs
new file mode 100644
index 0000000..86d093b
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReaderExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for IReader.
+ /// </summary>
+ public static class ReaderExtensions
+ {
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ReaderStateChanged> StateChangedTo(this
IReader reader, ReaderState state, CancellationToken cancellationToken =
default)
+ {
+ var newState = await reader.OnStateChangeTo(state,
cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(reader, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will
complete.
+ /// </remarks>
+ public static async ValueTask<ReaderStateChanged>
StateChangedFrom(this IReader reader, ReaderState state, CancellationToken
cancellationToken = default)
+ {
+ var newState = await reader.OnStateChangeFrom(state,
cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(reader, newState);
+ }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
b/src/DotPulsar/Extensions/ReceiveExtensions.cs
similarity index 52%
copy from src/DotPulsar/Abstractions/IHandleStateChanged.cs
copy to src/DotPulsar/Extensions/ReceiveExtensions.cs
index 04d6b60..c2262c6 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Extensions/ReceiveExtensions.cs
@@ -12,24 +12,25 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Extensions
{
+ using DotPulsar.Abstractions;
+ using System.Collections.Generic;
+ using System.Runtime.CompilerServices;
using System.Threading;
- using System.Threading.Tasks;
/// <summary>
- /// An state change handling abstraction.
+ /// Extensions for IReceive.
/// </summary>
- public interface IHandleStateChanged<TStateChanged>
+ public static class ReceiveExtensions
{
/// <summary>
- /// Called after a state has changed.
+ /// Get an IAsyncEnumerable for receiving messages.
/// </summary>
- ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken
cancellationToken);
-
- /// <summary>
- /// The cancellation token to use when waiting for and handling state
changes.
- /// </summary>
- CancellationToken CancellationToken { get; }
+ public static async IAsyncEnumerable<Message> Messages(this IReceive
receiver, [EnumeratorCancellation] CancellationToken cancellationToken =
default)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ yield return await
receiver.Receive(cancellationToken).ConfigureAwait(false);
+ }
}
}
diff --git a/src/DotPulsar/Extensions/SeekExtensions.cs
b/src/DotPulsar/Extensions/SeekExtensions.cs
new file mode 100644
index 0000000..b4f0d2c
--- /dev/null
+++ b/src/DotPulsar/Extensions/SeekExtensions.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for ISeek.
+ /// </summary>
+ public static class SeekExtensions
+ {
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a
specific message publish time using an UTC DateTime.
+ /// </summary>
+ public static async ValueTask Seek(this ISeek seeker, DateTime
publishTime, CancellationToken cancellationToken = default)
+ => await seeker.Seek((ulong) new
DateTimeOffset(publishTime).ToUnixTimeMilliseconds(),
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a
specific message publish time using a DateTimeOffset.
+ /// </summary>
+ public static async ValueTask Seek(this ISeek seeker, DateTimeOffset
publishTime, CancellationToken cancellationToken = default)
+ => await seeker.Seek((ulong) publishTime.ToUnixTimeMilliseconds(),
cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs
b/src/DotPulsar/Extensions/SendExtensions.cs
new file mode 100644
index 0000000..41ace1e
--- /dev/null
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for ISend.
+ /// </summary>
+ public static class SendExtensions
+ {
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender,
byte[] data, CancellationToken cancellationToken = default)
+ => await sender.Send(new ReadOnlySequence<byte>(data),
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender,
ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+ => await sender.Send(new ReadOnlySequence<byte>(data),
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message with metadata.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender,
MessageMetadata metadata, byte[] data, CancellationToken cancellationToken =
default)
+ => await sender.Send(metadata, new ReadOnlySequence<byte>(data),
cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message with metadata.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender,
MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken
cancellationToken = default)
+ => await sender.Send(metadata, new ReadOnlySequence<byte>(data),
cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 31140b2..ef09431 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -23,9 +23,9 @@ namespace DotPulsar.Internal.Abstractions
{
Task Send(CommandAck command, CancellationToken cancellationToken);
Task Send(CommandRedeliverUnacknowledgedMessages command,
CancellationToken cancellationToken);
- Task<CommandSuccess> Send(CommandUnsubscribe command,
CancellationToken cancellationToken);
- Task<CommandSuccess> Send(CommandSeek command, CancellationToken
cancellationToken);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId
command, CancellationToken cancellationToken);
+ Task Send(CommandUnsubscribe command, CancellationToken
cancellationToken);
+ Task Send(CommandSeek command, CancellationToken cancellationToken);
+ Task<MessageId> Send(CommandGetLastMessageId command,
CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken);
ValueTask ClosedByClient(CancellationToken cancellationToken);
}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index c6e9dfc..d263df2 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -17,13 +17,13 @@ namespace DotPulsar.Internal
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.Extensions;
using Events;
using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Linq;
- using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -59,17 +59,11 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
}
- public async ValueTask<ConsumerStateChanged>
StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- return new ConsumerStateChanged(this, newState);
- }
+ public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState
state, CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- public async ValueTask<ConsumerStateChanged>
StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
- return new ConsumerStateChanged(this, newState);
- }
+ public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -87,35 +81,28 @@ namespace DotPulsar.Internal
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async IAsyncEnumerable<Message>
Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+ public async ValueTask<Message> Receive(CancellationToken
cancellationToken)
{
ThrowIfDisposed();
- while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() =>
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> Receive(CancellationToken
cancellationToken)
+ private async ValueTask<Message> ReceiveMessage(CancellationToken
cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
- public async ValueTask Acknowledge(Message message, CancellationToken
cancellationToken)
- => await Acknowledge(message.MessageId.Data,
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
public async ValueTask Acknowledge(MessageId messageId,
CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data,
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
- public async ValueTask AcknowledgeCumulative(Message message,
CancellationToken cancellationToken)
- => await Acknowledge(message.MessageId.Data,
CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+ => await Acknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data,
CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+ => await Acknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+ command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
await _executor.Execute(() =>
RedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
@@ -131,16 +118,14 @@ namespace DotPulsar.Internal
}
private async ValueTask Unsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
- {
- _ = await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- }
+ =>await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
ThrowIfDisposed();
- var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ var seek = new CommandSeek { MessageId =
messageId.ToMessageIdData() };
+ await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
@@ -148,23 +133,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) new
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTimeOffset publishTime,
CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong)
publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
@@ -176,22 +145,21 @@ namespace DotPulsar.Internal
}
private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
- {
- var response = await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- return new MessageId(response.LastMessageId);
- }
+ => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- private async ValueTask<CommandSuccess> Seek(CommandSeek command,
CancellationToken cancellationToken)
+ private async Task Seek(CommandSeek command, CancellationToken
cancellationToken)
=> await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- private async ValueTask Acknowledge(MessageIdData messageIdData,
CommandAck.AckType ackType, CancellationToken cancellationToken)
+ private async ValueTask Acknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var commandAck = _commandAckPool.Get();
commandAck.Type = ackType;
- commandAck.MessageIds.Clear();
- commandAck.MessageIds.Add(messageIdData);
+ if (commandAck.MessageIds.Count == 0)
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ else
+ commandAck.MessageIds[0].MapFrom(messageId);
try
{
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs
b/src/DotPulsar/Internal/ConsumerChannel.cs
index 775e240..de109dd 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -21,7 +21,7 @@ namespace DotPulsar.Internal
using System.Threading;
using System.Threading.Tasks;
- public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
+ public sealed class ConsumerChannel : IConsumerChannel
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
@@ -88,7 +88,7 @@ namespace DotPulsar.Internal
return metadata.ShouldSerializeNumMessagesInBatch()
? _batchHandler.Add(messageId, redeliveryCount,
metadata, data)
- : MessageFactory.Create(new MessageId(messageId),
redeliveryCount, metadata, data);
+ : MessageFactory.Create(messageId.ToMessageId(),
redeliveryCount, metadata, data);
}
}
}
@@ -117,29 +117,27 @@ namespace DotPulsar.Internal
await _connection.Send(command,
cancellationToken).ConfigureAwait(false);
}
- public async Task<CommandSuccess> Send(CommandUnsubscribe command,
CancellationToken cancellationToken)
+ public async Task Send(CommandUnsubscribe command, CancellationToken
cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
- return response.Success;
}
- public async Task<CommandSuccess> Send(CommandSeek command,
CancellationToken cancellationToken)
+ public async Task Send(CommandSeek command, CancellationToken
cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
_batchHandler.Clear();
- return response.Success;
}
- public async Task<CommandGetLastMessageIdResponse>
Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ public async Task<MessageId> Send(CommandGetLastMessageId command,
CancellationToken cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
- return response.GetLastMessageIdResponse;
+ return
response.GetLastMessageIdResponse.LastMessageId.ToMessageId();
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
similarity index 50%
rename from src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
rename to src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
index 55135ea..857b00c 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
@@ -12,18 +12,21 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Internal.Extensions
{
using DotPulsar.Internal.PulsarApi;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- public interface IReaderChannel : IAsyncDisposable
+ public static class MessageIdDataExtensions
{
- Task<CommandSuccess> Send(CommandSeek command, CancellationToken
cancellationToken);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId
command, CancellationToken cancellationToken);
- ValueTask<Message> Receive(CancellationToken cancellationToken);
- ValueTask ClosedByClient(CancellationToken cancellationToken);
+ public static MessageId ToMessageId(this MessageIdData messageIdData)
+ => new MessageId(messageIdData.LedgerId, messageIdData.EntryId,
messageIdData.Partition, messageIdData.BatchIndex);
+
+ public static void MapFrom(this MessageIdData destination, MessageId
source)
+ {
+ destination.LedgerId = source.LedgerId;
+ destination.EntryId = source.EntryId;
+ destination.Partition = source.Partition;
+ destination.BatchIndex = source.BatchIndex;
+ }
}
}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs
b/src/DotPulsar/Internal/MessageBuilder.cs
index d7c7b02..c17ec19 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
using Extensions;
using System;
using System.Threading;
diff --git a/src/DotPulsar/Internal/MonitorState.cs
b/src/DotPulsar/Internal/MonitorState.cs
index 28e0f9a..7a50044 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
using System.Threading.Tasks;
public static class StateMonitor
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index e463ff6..75e724f 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -22,7 +22,7 @@ namespace DotPulsar.Internal
using System.Threading;
using System.Threading.Tasks;
- public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel,
IReaderChannel
+ public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel
{
public ValueTask DisposeAsync()
=> new ValueTask();
@@ -42,13 +42,13 @@ namespace DotPulsar.Internal
public Task Send(CommandRedeliverUnacknowledgedMessages command,
CancellationToken cancellationToken)
=> throw GetException();
- public Task<CommandSuccess> Send(CommandUnsubscribe command,
CancellationToken cancellationToken)
+ public Task Send(CommandUnsubscribe command, CancellationToken
cancellationToken)
=> throw GetException();
- public Task<CommandSuccess> Send(CommandSeek command,
CancellationToken cancellationToken)
+ public Task Send(CommandSeek command, CancellationToken
cancellationToken)
=> throw GetException();
- public Task<CommandGetLastMessageIdResponse>
Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ public Task<MessageId> Send(CommandGetLastMessageId command,
CancellationToken cancellationToken)
=> throw GetException();
private static Exception GetException()
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 2347eee..12e4625 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Internal
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.Extensions;
using Events;
using Microsoft.Extensions.ObjectPool;
using System;
@@ -60,17 +61,11 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ProducerCreated(_correlationId, this));
}
- public async ValueTask<ProducerStateChanged>
StateChangedTo(ProducerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- return new ProducerStateChanged(this, newState);
- }
+ public async ValueTask<ProducerState> OnStateChangeTo(ProducerState
state, CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- public async ValueTask<ProducerStateChanged>
StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
- return new ProducerStateChanged(this, newState);
- }
+ public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState
state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -88,12 +83,6 @@ namespace DotPulsar.Internal
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public ValueTask<MessageId> Send(byte[] data, CancellationToken
cancellationToken)
- => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
- public ValueTask<MessageId> Send(ReadOnlyMemory<byte> data,
CancellationToken cancellationToken)
- => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -110,12 +99,6 @@ namespace DotPulsar.Internal
}
}
- public ValueTask<MessageId> Send(MessageMetadata metadata, byte[]
data, CancellationToken cancellationToken)
- => Send(metadata, new ReadOnlySequence<byte>(data),
cancellationToken);
-
- public ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => Send(metadata, new ReadOnlySequence<byte>(data),
cancellationToken);
-
public async ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -138,7 +121,7 @@ namespace DotPulsar.Internal
private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
var response = await _channel.Send(metadata, data,
cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+ return response.MessageId.ToMessageId();
}
internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 571c5b6..ccbea59 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -20,8 +20,6 @@ namespace DotPulsar.Internal
using DotPulsar.Internal.PulsarApi;
using Events;
using System;
- using System.Collections.Generic;
- using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -29,7 +27,7 @@ namespace DotPulsar.Internal
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
- private IReaderChannel _channel;
+ private IConsumerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ReaderState> _state;
private int _isDisposed;
@@ -40,7 +38,7 @@ namespace DotPulsar.Internal
Guid correlationId,
string topic,
IRegisterEvent eventRegister,
- IReaderChannel initialChannel,
+ IConsumerChannel initialChannel,
IExecute executor,
IStateChanged<ReaderState> state)
{
@@ -55,17 +53,11 @@ namespace DotPulsar.Internal
_eventRegister.Register(new ReaderCreated(_correlationId, this));
}
- public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState
state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- return new ReaderStateChanged(this, newState);
- }
+ public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state,
CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- public async ValueTask<ReaderStateChanged>
StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
- return new ReaderStateChanged(this, newState);
- }
+ public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState
state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -82,28 +74,24 @@ namespace DotPulsar.Internal
}
private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
- {
- var response = await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- return new MessageId(response.LastMessageId);
- }
+ => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- public async IAsyncEnumerable<Message>
Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+ public async ValueTask<Message> Receive(CancellationToken
cancellationToken)
{
ThrowIfDisposed();
- while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() =>
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> Receive(CancellationToken
cancellationToken)
+ private async ValueTask<Message> ReceiveMessage(CancellationToken
cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
ThrowIfDisposed();
- var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ var seek = new CommandSeek { MessageId =
messageId.ToMessageIdData() };
+ await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
@@ -111,23 +99,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) new
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTimeOffset publishTime,
CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong)
publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
@@ -140,10 +112,10 @@ namespace DotPulsar.Internal
await _channel.DisposeAsync().ConfigureAwait(false);
}
- private async ValueTask<CommandSuccess> Seek(CommandSeek command,
CancellationToken cancellationToken)
+ private async Task Seek(CommandSeek command, CancellationToken
cancellationToken)
=> await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- internal async ValueTask SetChannel(IReaderChannel channel)
+ internal async ValueTask SetChannel(IConsumerChannel channel)
{
if (_isDisposed != 0)
{
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs
b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 9efe033..58b31e1 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -20,7 +20,7 @@ namespace DotPulsar.Internal
using System.Threading;
using System.Threading.Tasks;
- public sealed class ReaderChannelFactory : IReaderChannelFactory
+ public sealed class ReaderChannelFactory : IConsumerChannelFactory
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
@@ -48,7 +48,7 @@ namespace DotPulsar.Internal
ConsumerName = options.ReaderName,
Durable = false,
ReadCompacted = options.ReadCompacted,
- StartMessageId = options.StartMessageId.Data,
+ StartMessageId = options.StartMessageId.ToMessageIdData(),
Subscription = $"Reader-{Guid.NewGuid():N}",
Topic = options.Topic
};
@@ -56,10 +56,10 @@ namespace DotPulsar.Internal
_batchHandler = new BatchHandler(false);
}
- public async Task<IReaderChannel> Create(CancellationToken
cancellationToken)
+ public async Task<IConsumerChannel> Create(CancellationToken
cancellationToken)
=> await _executor.Execute(() => GetChannel(cancellationToken),
cancellationToken).ConfigureAwait(false);
- private async ValueTask<IReaderChannel> GetChannel(CancellationToken
cancellationToken)
+ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken
cancellationToken)
{
var connection = await
_connectionPool.FindConnectionForTopic(_subscribe.Topic,
cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs
b/src/DotPulsar/Internal/ReaderProcess.cs
index 67763c5..404de7e 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,13 +21,13 @@ namespace DotPulsar.Internal
public sealed class ReaderProcess : Process
{
private readonly IStateManager<ReaderState> _stateManager;
- private readonly IReaderChannelFactory _factory;
+ private readonly IConsumerChannelFactory _factory;
private readonly Reader _reader;
public ReaderProcess(
Guid correlationId,
IStateManager<ReaderState> stateManager,
- IReaderChannelFactory factory,
+ IConsumerChannelFactory factory,
Reader reader) : base(correlationId)
{
_stateManager = stateManager;
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index cdcc2eb..bc32039 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -14,6 +14,7 @@
namespace DotPulsar
{
+ using DotPulsar.Internal.Extensions;
using Internal.PulsarApi;
using System;
@@ -38,42 +39,36 @@ namespace DotPulsar
/// </summary>
public static MessageId Latest { get; }
- internal MessageId(MessageIdData messageIdData)
- => Data = messageIdData;
-
/// <summary>
/// Initializes a new instance using the specified ledgerId, entryId,
partition and batchIndex.
/// </summary>
public MessageId(ulong ledgerId, ulong entryId, int partition, int
batchIndex)
- => Data = new MessageIdData
- {
- LedgerId = ledgerId,
- EntryId = entryId,
- Partition = partition,
- BatchIndex = batchIndex
- };
-
- internal MessageIdData Data { get; }
+ {
+ LedgerId = ledgerId;
+ EntryId = entryId;
+ Partition = partition;
+ BatchIndex = batchIndex;
+ }
/// <summary>
/// The id of the ledger.
/// </summary>
- public ulong LedgerId => Data.LedgerId;
+ public ulong LedgerId { get; }
/// <summary>
/// The id of the entry.
/// </summary>
- public ulong EntryId => Data.EntryId;
+ public ulong EntryId { get; }
/// <summary>
/// The partition.
/// </summary>
- public int Partition => Data.Partition;
+ public int Partition { get; }
/// <summary>
/// The batch index.
/// </summary>
- public int BatchIndex => Data.BatchIndex;
+ public int BatchIndex { get; }
public int CompareTo(MessageId? other)
{
@@ -124,5 +119,12 @@ namespace DotPulsar
public override string ToString()
=> $"{LedgerId}:{EntryId}:{Partition}:{BatchIndex}";
+
+ internal MessageIdData ToMessageIdData()
+ {
+ var messageIdData = new MessageIdData();
+ messageIdData.MapFrom(this);
+ return messageIdData;
+ }
}
}