This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 83370ce Added GetLastMessageId on the Reader. Fixed reconnection
issues when seeking.
83370ce is described below
commit 83370ce00dfa69dc535e2e7f03dbbacfd65eadc9
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Dec 15 15:59:55 2020 +0100
Added GetLastMessageId on the Reader. Fixed reconnection issues when
seeking.
---
CHANGELOG.md | 7 ++++++
samples/Consuming/Program.cs | 4 ++--
samples/Producing/Program.cs | 4 ++--
samples/Reading/Program.cs | 4 ++--
src/DotPulsar/Abstractions/IReader.cs | 5 +++++
.../Internal/Abstractions/IConsumerChannel.cs | 1 +
.../Internal/Abstractions/IProducerChannel.cs | 1 +
.../Internal/Abstractions/IReaderChannel.cs | 2 ++
src/DotPulsar/Internal/Consumer.cs | 18 ++++++++++-----
src/DotPulsar/Internal/ConsumerChannel.cs | 26 +++++++++++++---------
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 2 ++
src/DotPulsar/Internal/NotReadyChannel.cs | 3 +++
src/DotPulsar/Internal/Producer.cs | 2 +-
src/DotPulsar/Internal/ProducerChannel.cs | 6 +++--
src/DotPulsar/Internal/Reader.cs | 24 +++++++++++++++-----
15 files changed, 78 insertions(+), 31 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0a558bf..e636dcc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,11 +16,18 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
- Added properties on Message to read EventTime and PublishTime as DateTime
- Added methods on the IMessageBuilder to set EventTime and DeliverAt using
DateTime
- Added properties on MessageMetadata to set EventTime and DeliverAtTime using
DateTime
+- Added seeking by MessageId on the Reader
+- Added seeking by message publish time on the Consumer and Reader
+- Added GetLastMessageId on the Reader
### Changed
- The protobuf-net dependency is upgraded from 2.4.6 to 3.X
+### Fixed
+
+- Reconnection issues when seeking
+
## [0.9.7] - 2020-12-04
### Added
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 0cc6e06..ad52b2b 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,11 +29,11 @@ namespace Consuming
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+ var taskCompletionSource = new
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult(null);
+ taskCompletionSource.SetResult();
args.Cancel = true;
};
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 26f7f51..3eb4aae 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,11 +28,11 @@ namespace Producing
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+ var taskCompletionSource = new
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult(null);
+ taskCompletionSource.SetResult();
args.Cancel = true;
};
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 26289d3..5bd6099 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,11 +29,11 @@ namespace Reading
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+ var taskCompletionSource = new
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult(null);
+ taskCompletionSource.SetResult();
args.Cancel = true;
};
diff --git a/src/DotPulsar/Abstractions/IReader.cs
b/src/DotPulsar/Abstractions/IReader.cs
index 7b9502a..1023419 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -41,6 +41,11 @@ namespace DotPulsar.Abstractions
bool IsFinalState(ReaderState state);
/// <summary>
+ /// Get the MessageId of the last message on the topic.
+ /// </summary>
+ ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default);
+
+ /// <summary>
/// Get an IAsyncEnumerable for reading messages
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken
= default);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index a86ff5a..4daf83e 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -27,5 +27,6 @@ namespace DotPulsar.Internal.Abstractions
Task<CommandSuccess> Send(CommandSeek command, CancellationToken
cancellationToken);
Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId
command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken);
+ ValueTask ClosedByClient(CancellationToken cancellationToken =
default);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index b793181..dd806f4 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -23,5 +23,6 @@ namespace DotPulsar.Internal.Abstractions
public interface IProducerChannel : IAsyncDisposable
{
Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ ValueTask ClosedByClient(CancellationToken cancellationToken =
default);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index a63762b..a692c18 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -22,6 +22,8 @@ namespace DotPulsar.Internal.Abstractions
public interface IReaderChannel : IAsyncDisposable
{
Task<CommandSuccess> Send(CommandSeek command, CancellationToken
cancellationToken);
+ Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId
command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken =
default);
+ ValueTask ClosedByClient(CancellationToken cancellationToken =
default);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index dad2416..2282711 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -83,6 +83,7 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ConsumerDisposed(_correlationId,
this));
+ await _channel.ClosedByClient().ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
@@ -91,9 +92,12 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() =>
_channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ yield return await _executor.Execute(() =>
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
+ private async ValueTask<Message> Receive(CancellationToken
cancellationToken)
+ => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+
public async ValueTask Acknowledge(Message message, CancellationToken
cancellationToken)
=> await Acknowledge(message.MessageId.Data,
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
@@ -125,7 +129,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
@@ -133,7 +137,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken)
@@ -141,7 +145,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = (ulong) new
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(DateTimeOffset publishTime,
CancellationToken cancellationToken)
@@ -149,7 +153,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = (ulong)
publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
@@ -158,10 +162,12 @@ namespace DotPulsar.Internal
var getLastMessageId = new CommandGetLastMessageId();
var response = await _executor.Execute(() =>
_channel.Send(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
-
return new MessageId(response.LastMessageId);
}
+ private async ValueTask<CommandSuccess> Seek(CommandSeek command,
CancellationToken cancellationToken)
+ => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+
private async ValueTask Acknowledge(MessageIdData messageIdData,
CommandAck.AckType ackType, CancellationToken cancellationToken)
{
ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs
b/src/DotPulsar/Internal/ConsumerChannel.cs
index eef5d8d..02c9316 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -144,17 +144,8 @@ namespace DotPulsar.Internal
public async ValueTask DisposeAsync()
{
- try
- {
- _queue.Dispose();
- await _lock.DisposeAsync();
- var closeConsumer = new CommandCloseConsumer { ConsumerId =
_id };
- await _connection.Send(closeConsumer,
CancellationToken.None).ConfigureAwait(false);
- }
- catch
- {
- // Ignore
- }
+ _queue.Dispose();
+ await _lock.DisposeAsync().ConfigureAwait(false);
}
private async ValueTask SendFlow(CancellationToken cancellationToken)
@@ -179,5 +170,18 @@ namespace DotPulsar.Internal
await Send(ack, cancellationToken).ConfigureAwait(false);
}
+
+ public async ValueTask ClosedByClient(CancellationToken
cancellationToken)
+ {
+ try
+ {
+ var closeConsumer = new CommandCloseConsumer { ConsumerId =
_id };
+ await _connection.Send(closeConsumer,
cancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
}
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index abc5601..b4f26d8 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -45,6 +45,8 @@ namespace DotPulsar.Internal
TooManyRequestsException _ => FaultAction.Retry,
ChannelNotReadyException _ => FaultAction.Retry,
ServiceNotReadyException _ => FaultAction.Retry,
+ MetadataException _ => FaultAction.Retry,
+ ConsumerNotFoundException _ => FaultAction.Retry,
ConnectionDisposedException _ => FaultAction.Retry,
AsyncLockDisposedException _ => FaultAction.Retry,
PulsarStreamDisposedException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index fee9bb4..60f0cbf 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -51,6 +51,9 @@ namespace DotPulsar.Internal
public Task<CommandGetLastMessageIdResponse>
Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
=> throw GetException();
+ public ValueTask ClosedByClient(CancellationToken cancellationToken)
+ => throw GetException();
+
private static Exception GetException()
=> new ChannelNotReadyException();
}
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index 9e81810..1ac6886 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -84,7 +84,7 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ProducerDisposed(_correlationId,
this));
-
+ await _channel.ClosedByClient().ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs
b/src/DotPulsar/Internal/ProducerChannel.cs
index 05390a9..3b43351 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -39,12 +39,12 @@ namespace DotPulsar.Internal
_connection = connection;
}
- public async ValueTask DisposeAsync()
+ public async ValueTask ClosedByClient(CancellationToken
cancellationToken)
{
try
{
var closeProducer = new CommandCloseProducer { ProducerId =
_id };
- await _connection.Send(closeProducer,
CancellationToken.None).ConfigureAwait(false);
+ await _connection.Send(closeProducer,
cancellationToken).ConfigureAwait(false);
}
catch
{
@@ -52,6 +52,8 @@ namespace DotPulsar.Internal
}
}
+ public ValueTask DisposeAsync() => new ValueTask();
+
public async Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
var sendPackage = _sendPackagePool.Get();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 3dccb73..eeb95f5 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,20 +73,30 @@ namespace DotPulsar.Internal
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken = default)
+ {
+ var getLastMessageId = new CommandGetLastMessageId();
+ var response = await _executor.Execute(() =>
_channel.Send(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return new MessageId(response.LastMessageId);
+ }
+
public async IAsyncEnumerable<Message>
Messages([EnumeratorCancellation] CancellationToken cancellationToken)
{
ThrowIfDisposed();
while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() =>
_channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ yield return await _executor.Execute(() =>
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
}
+ private async ValueTask<Message> Receive(CancellationToken
cancellationToken)
+ => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
ThrowIfDisposed();
var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
@@ -94,7 +104,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(DateTime publishTime, CancellationToken
cancellationToken)
@@ -102,7 +112,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = (ulong) new
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(DateTimeOffset publishTime,
CancellationToken cancellationToken)
@@ -110,7 +120,7 @@ namespace DotPulsar.Internal
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = (ulong)
publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => _channel.Send(seek,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ _ = await _executor.Execute(() => Seek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
@@ -119,9 +129,13 @@ namespace DotPulsar.Internal
return;
_eventRegister.Register(new ReaderDisposed(_correlationId, this));
+ await _channel.ClosedByClient().ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
+ private async ValueTask<CommandSuccess> Seek(CommandSeek command,
CancellationToken cancellationToken)
+ => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+
internal async ValueTask SetChannel(IReaderChannel channel)
{
if (_isDisposed != 0)