entvex commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1288389669
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] = _subReaders.Values.ElementAt(i).Seek(messageId,
cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] =
_subReaders.Values.ElementAt(i).Seek(publishTime, cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ReaderDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
- }
-
- private async ValueTask DisposeChannel()
- {
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ foreach (var subConsumer in _subReaders.Values)
+ {
+ await subConsumer.DisposeAsync().ConfigureAwait(false);
+ }
}
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ private StateManager<ReaderState> CreateStateManager()
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ return new StateManager<ReaderState>(ReaderState.Disconnected,
ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private string GetPartitionedTopicName(int partitionNumber)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
- }
-
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ReaderDisposedException(GetType().FullName!);
-
- if (_faultException is not null)
- throw new ReaderFaultedException(_faultException);
+ return $"{Topic}-partition-{partitionNumber}";
}
- public async ValueTask ChannelFaulted(Exception exception)
+ private async Task Guard(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ if (!_allSubReadersIsReady)
+ await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
Review Comment:
Make the change also in consumer
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] = _subReaders.Values.ElementAt(i).Seek(messageId,
cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] =
_subReaders.Values.ElementAt(i).Seek(publishTime, cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ReaderDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
- }
-
- private async ValueTask DisposeChannel()
- {
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ foreach (var subConsumer in _subReaders.Values)
+ {
+ await subConsumer.DisposeAsync().ConfigureAwait(false);
+ }
}
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ private StateManager<ReaderState> CreateStateManager()
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ return new StateManager<ReaderState>(ReaderState.Disconnected,
ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private string GetPartitionedTopicName(int partitionNumber)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
- }
-
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ReaderDisposedException(GetType().FullName!);
-
- if (_faultException is not null)
- throw new ReaderFaultedException(_faultException);
+ return $"{Topic}-partition-{partitionNumber}";
}
- public async ValueTask ChannelFaulted(Exception exception)
+ private async Task Guard(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ if (!_allSubReadersIsReady)
+ await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
Review Comment:
just like the consumer
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] = _subReaders.Values.ElementAt(i).Seek(messageId,
cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] =
_subReaders.Values.ElementAt(i).Seek(publishTime, cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ReaderDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
- }
-
- private async ValueTask DisposeChannel()
- {
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ foreach (var subConsumer in _subReaders.Values)
+ {
+ await subConsumer.DisposeAsync().ConfigureAwait(false);
+ }
}
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ private StateManager<ReaderState> CreateStateManager()
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ return new StateManager<ReaderState>(ReaderState.Disconnected,
ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private string GetPartitionedTopicName(int partitionNumber)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
- }
-
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ReaderDisposedException(GetType().FullName!);
-
- if (_faultException is not null)
- throw new ReaderFaultedException(_faultException);
+ return $"{Topic}-partition-{partitionNumber}";
}
- public async ValueTask ChannelFaulted(Exception exception)
+ private async Task Guard(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ if (!_allSubReadersIsReady)
+ await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
Review Comment:
We release it at the end of the monitor method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]