This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83370ce  Added GetLastMessageId on the Reader. Fixed reconnection 
issues when seeking.
83370ce is described below

commit 83370ce00dfa69dc535e2e7f03dbbacfd65eadc9
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Dec 15 15:59:55 2020 +0100

    Added GetLastMessageId on the Reader. Fixed reconnection issues when 
seeking.
---
 CHANGELOG.md                                       |  7 ++++++
 samples/Consuming/Program.cs                       |  4 ++--
 samples/Producing/Program.cs                       |  4 ++--
 samples/Reading/Program.cs                         |  4 ++--
 src/DotPulsar/Abstractions/IReader.cs              |  5 +++++
 .../Internal/Abstractions/IConsumerChannel.cs      |  1 +
 .../Internal/Abstractions/IProducerChannel.cs      |  1 +
 .../Internal/Abstractions/IReaderChannel.cs        |  2 ++
 src/DotPulsar/Internal/Consumer.cs                 | 18 ++++++++++-----
 src/DotPulsar/Internal/ConsumerChannel.cs          | 26 +++++++++++++---------
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  |  2 ++
 src/DotPulsar/Internal/NotReadyChannel.cs          |  3 +++
 src/DotPulsar/Internal/Producer.cs                 |  2 +-
 src/DotPulsar/Internal/ProducerChannel.cs          |  6 +++--
 src/DotPulsar/Internal/Reader.cs                   | 24 +++++++++++++++-----
 15 files changed, 78 insertions(+), 31 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0a558bf..e636dcc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,11 +16,18 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/),
 - Added properties on Message to read EventTime and PublishTime as DateTime
 - Added methods on the IMessageBuilder to set EventTime and DeliverAt using 
DateTime
 - Added properties on MessageMetadata to set EventTime and DeliverAtTime using 
DateTime
+- Added seeking by MessageId on the Reader
+- Added seeking by message publish time on the Consumer and Reader
+- Added GetLastMessageId on the Reader
 
 ### Changed
 
 - The protobuf-net dependency is upgraded from 2.4.6 to 3.X
 
+### Fixed
+
+- Reconnection issues when seeking
+
 ## [0.9.7] - 2020-12-04
 
 ### Added
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 0cc6e06..ad52b2b 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,11 +29,11 @@ namespace Consuming
         {
             const string myTopic = "persistent://public/default/mytopic";
 
-            var taskCompletionSource = new 
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+            var taskCompletionSource = new 
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
             Console.CancelKeyPress += (sender, args) =>
             {
-                taskCompletionSource.SetResult(null);
+                taskCompletionSource.SetResult();
                 args.Cancel = true;
             };
 
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 26f7f51..3eb4aae 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,11 +28,11 @@ namespace Producing
         {
             const string myTopic = "persistent://public/default/mytopic";
 
-            var taskCompletionSource = new 
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+            var taskCompletionSource = new 
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
             Console.CancelKeyPress += (sender, args) =>
             {
-                taskCompletionSource.SetResult(null);
+                taskCompletionSource.SetResult();
                 args.Cancel = true;
             };
 
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 26289d3..5bd6099 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,11 +29,11 @@ namespace Reading
         {
             const string myTopic = "persistent://public/default/mytopic";
 
-            var taskCompletionSource = new 
TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+            var taskCompletionSource = new 
TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
 
             Console.CancelKeyPress += (sender, args) =>
             {
-                taskCompletionSource.SetResult(null);
+                taskCompletionSource.SetResult();
                 args.Cancel = true;
             };
 
diff --git a/src/DotPulsar/Abstractions/IReader.cs 
b/src/DotPulsar/Abstractions/IReader.cs
index 7b9502a..1023419 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -41,6 +41,11 @@ namespace DotPulsar.Abstractions
         bool IsFinalState(ReaderState state);
 
         /// <summary>
+        /// Get the MessageId of the last message on the topic.
+        /// </summary>
+        ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken = default);
+
+        /// <summary>
         /// Get an IAsyncEnumerable for reading messages
         /// </summary>
         IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken 
= default);
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index a86ff5a..4daf83e 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -27,5 +27,6 @@ namespace DotPulsar.Internal.Abstractions
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken 
cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken);
+        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index b793181..dd806f4 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -23,5 +23,6 @@ namespace DotPulsar.Internal.Abstractions
     public interface IProducerChannel : IAsyncDisposable
     {
         Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index a63762b..a692c18 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -22,6 +22,8 @@ namespace DotPulsar.Internal.Abstractions
     public interface IReaderChannel : IAsyncDisposable
     {
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken 
cancellationToken);
+        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken = 
default);
+        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index dad2416..2282711 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -83,6 +83,7 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ConsumerDisposed(_correlationId, 
this));
+            await _channel.ClosedByClient().ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -91,9 +92,12 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             while (!cancellationToken.IsCancellationRequested)
-                yield return await _executor.Execute(() => 
_channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+                yield return await _executor.Execute(() => 
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
+        private async ValueTask<Message> Receive(CancellationToken 
cancellationToken)
+            => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+
         public async ValueTask Acknowledge(Message message, CancellationToken 
cancellationToken)
             => await Acknowledge(message.MessageId.Data, 
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
 
@@ -125,7 +129,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
@@ -133,7 +137,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = publishTime };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(DateTime publishTime, CancellationToken 
cancellationToken)
@@ -141,7 +145,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = (ulong) new 
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(DateTimeOffset publishTime, 
CancellationToken cancellationToken)
@@ -149,7 +153,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = (ulong) 
publishTime.ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
@@ -158,10 +162,12 @@ namespace DotPulsar.Internal
 
             var getLastMessageId = new CommandGetLastMessageId();
             var response = await _executor.Execute(() => 
_channel.Send(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-
             return new MessageId(response.LastMessageId);
         }
 
+        private async ValueTask<CommandSuccess> Seek(CommandSeek command, 
CancellationToken cancellationToken)
+            => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+
         private async ValueTask Acknowledge(MessageIdData messageIdData, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs 
b/src/DotPulsar/Internal/ConsumerChannel.cs
index eef5d8d..02c9316 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -144,17 +144,8 @@ namespace DotPulsar.Internal
 
         public async ValueTask DisposeAsync()
         {
-            try
-            {
-                _queue.Dispose();
-                await _lock.DisposeAsync();
-                var closeConsumer = new CommandCloseConsumer { ConsumerId = 
_id };
-                await _connection.Send(closeConsumer, 
CancellationToken.None).ConfigureAwait(false);
-            }
-            catch
-            {
-                // Ignore
-            }
+            _queue.Dispose();
+            await _lock.DisposeAsync().ConfigureAwait(false);
         }
 
         private async ValueTask SendFlow(CancellationToken cancellationToken)
@@ -179,5 +170,18 @@ namespace DotPulsar.Internal
 
             await Send(ack, cancellationToken).ConfigureAwait(false);
         }
+
+        public async ValueTask ClosedByClient(CancellationToken 
cancellationToken)
+        {
+            try
+            {
+                var closeConsumer = new CommandCloseConsumer { ConsumerId = 
_id };
+                await _connection.Send(closeConsumer, 
cancellationToken).ConfigureAwait(false);
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index abc5601..b4f26d8 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -45,6 +45,8 @@ namespace DotPulsar.Internal
                 TooManyRequestsException _ => FaultAction.Retry,
                 ChannelNotReadyException _ => FaultAction.Retry,
                 ServiceNotReadyException _ => FaultAction.Retry,
+                MetadataException _ => FaultAction.Retry,
+                ConsumerNotFoundException _ => FaultAction.Retry,
                 ConnectionDisposedException _ => FaultAction.Retry,
                 AsyncLockDisposedException _ => FaultAction.Retry,
                 PulsarStreamDisposedException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs 
b/src/DotPulsar/Internal/NotReadyChannel.cs
index fee9bb4..60f0cbf 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -51,6 +51,9 @@ namespace DotPulsar.Internal
         public Task<CommandGetLastMessageIdResponse> 
Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
             => throw GetException();
 
+        public ValueTask ClosedByClient(CancellationToken cancellationToken)
+            => throw GetException();
+
         private static Exception GetException()
             => new ChannelNotReadyException();
     }
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 9e81810..1ac6886 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -84,7 +84,7 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ProducerDisposed(_correlationId, 
this));
-
+            await _channel.ClosedByClient().ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs 
b/src/DotPulsar/Internal/ProducerChannel.cs
index 05390a9..3b43351 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -39,12 +39,12 @@ namespace DotPulsar.Internal
             _connection = connection;
         }
 
-        public async ValueTask DisposeAsync()
+        public async ValueTask ClosedByClient(CancellationToken 
cancellationToken)
         {
             try
             {
                 var closeProducer = new CommandCloseProducer { ProducerId = 
_id };
-                await _connection.Send(closeProducer, 
CancellationToken.None).ConfigureAwait(false);
+                await _connection.Send(closeProducer, 
cancellationToken).ConfigureAwait(false);
             }
             catch
             {
@@ -52,6 +52,8 @@ namespace DotPulsar.Internal
             }
         }
 
+        public ValueTask DisposeAsync() => new ValueTask();
+
         public async Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             var sendPackage = _sendPackagePool.Get();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 3dccb73..eeb95f5 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,20 +73,30 @@ namespace DotPulsar.Internal
         public bool IsFinalState(ReaderState state)
             => _state.IsFinalState(state);
 
+        public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken = default)
+        {
+            var getLastMessageId = new CommandGetLastMessageId();
+            var response = await _executor.Execute(() => 
_channel.Send(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+            return new MessageId(response.LastMessageId);
+        }
+
         public async IAsyncEnumerable<Message> 
Messages([EnumeratorCancellation] CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
             while (!cancellationToken.IsCancellationRequested)
-                yield return await _executor.Execute(() => 
_channel.Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+                yield return await _executor.Execute(() => 
Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
         }
 
+        private async ValueTask<Message> Receive(CancellationToken 
cancellationToken)
+            => await _channel.Receive(cancellationToken).ConfigureAwait(false);
+
         public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
         {
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
@@ -94,7 +104,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = publishTime };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(DateTime publishTime, CancellationToken 
cancellationToken)
@@ -102,7 +112,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = (ulong) new 
DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(DateTimeOffset publishTime, 
CancellationToken cancellationToken)
@@ -110,7 +120,7 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var seek = new CommandSeek { MessagePublishTime = (ulong) 
publishTime.ToUnixTimeMilliseconds() };
-            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            _ = await _executor.Execute(() => Seek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask DisposeAsync()
@@ -119,9 +129,13 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ReaderDisposed(_correlationId, this));
+            await _channel.ClosedByClient().ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
+        private async ValueTask<CommandSuccess> Seek(CommandSeek command, 
CancellationToken cancellationToken)
+            => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+
         internal async ValueTask SetChannel(IReaderChannel channel)
         {
             if (_isDisposed != 0)

Reply via email to