entvex commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1288243246
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
Review Comment:
Fixed in both Reader and Consumer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]