This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
commit a24b31f38679ba3e44b8c04689eeccd1b693b01e Author: SeĢrgio Silveira <[email protected]> AuthorDate: Sun Mar 29 19:46:58 2020 +0200 More small tweaks. --- src/DotPulsar/Internal/ProcessManager.cs | 4 +--- src/DotPulsar/Internal/ProducerBuilder.cs | 6 ++---- src/DotPulsar/Internal/ProducerChannel.cs | 4 ++-- src/DotPulsar/Internal/PulsarClientBuilder.cs | 12 +++++------- src/DotPulsar/Internal/ReaderBuilder.cs | 6 ++---- src/DotPulsar/Internal/StateTask.cs | 11 +++-------- src/DotPulsar/Internal/StateTaskCollection.cs | 1 + 7 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs index 9be1653..29b1e0b 100644 --- a/src/DotPulsar/Internal/ProcessManager.cs +++ b/src/DotPulsar/Internal/ProcessManager.cs @@ -34,9 +34,7 @@ namespace DotPulsar.Internal public async ValueTask DisposeAsync() { - var processes = _processes.Values.ToArray(); - - foreach (var proc in processes) + foreach (var proc in _processes.Values.ToArray()) await proc.DisposeAsync().ConfigureAwait(false); await _connectionPool.DisposeAsync().ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs index 8916cd0..40f4599 100644 --- a/src/DotPulsar/Internal/ProducerBuilder.cs +++ b/src/DotPulsar/Internal/ProducerBuilder.cs @@ -53,13 +53,11 @@ namespace DotPulsar.Internal if (string.IsNullOrEmpty(_topic)) throw new ConfigurationException("ProducerOptions.Topic may not be null or empty"); - var options = new ProducerOptions(_topic!) + return _pulsarClient.CreateProducer(new ProducerOptions(_topic!) { InitialSequenceId = _initialSequenceId, ProducerName = _producerName - }; - - return _pulsarClient.CreateProducer(options); + }); } } } diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs index 537104c..d6181e7 100644 --- a/src/DotPulsar/Internal/ProducerChannel.cs +++ b/src/DotPulsar/Internal/ProducerChannel.cs @@ -106,9 +106,9 @@ namespace DotPulsar.Internal } finally { + // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId if (autoAssignSequenceId) - _cachedSendPackage.Metadata.SequenceId = - 0; // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId + _cachedSendPackage.Metadata.SequenceId = 0; } } } diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs index 95827e6..bd1b166 100644 --- a/src/DotPulsar/Internal/PulsarClientBuilder.cs +++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs @@ -145,19 +145,17 @@ namespace DotPulsar.Internal { throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'"); } - + var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName); var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval); - var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) + var processManager = new ProcessManager(connectionPool); + + var exceptionHandlerPipeline = new ExceptionHandlerPipeline(new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) - }; - - var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers); - - var processManager = new ProcessManager(connectionPool); + }); return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline); } diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs index 41ff928..15b7c64 100644 --- a/src/DotPulsar/Internal/ReaderBuilder.cs +++ b/src/DotPulsar/Internal/ReaderBuilder.cs @@ -71,14 +71,12 @@ namespace DotPulsar.Internal if (string.IsNullOrEmpty(_topic)) throw new ConfigurationException("Topic may not be null or empty"); - var options = new ReaderOptions(_startMessageId, _topic!) + return _pulsarClient.CreateReader(new ReaderOptions(_startMessageId, _topic!) { MessagePrefetchCount = _messagePrefetchCount, ReadCompacted = _readCompacted, ReaderName = _readerName - }; - - return _pulsarClient.CreateReader(options); + }); } } } diff --git a/src/DotPulsar/Internal/StateTask.cs b/src/DotPulsar/Internal/StateTask.cs index dbd5a08..5943f09 100644 --- a/src/DotPulsar/Internal/StateTask.cs +++ b/src/DotPulsar/Internal/StateTask.cs @@ -30,15 +30,10 @@ namespace DotPulsar.Internal public CancelableCompletionSource<TState> CancelableCompletionSource { get; } - public void Dispose() - => CancelableCompletionSource.Dispose(); - public bool IsAwaiting(TState state) - { - if (_change == StateChanged.To) - return _state.Equals(state); + => _change == StateChanged.To ? _state.Equals(state) : !_state.Equals(state); - return !_state.Equals(state); - } + public void Dispose() + => CancelableCompletionSource.Dispose(); } } diff --git a/src/DotPulsar/Internal/StateTaskCollection.cs b/src/DotPulsar/Internal/StateTaskCollection.cs index d8d2547..7606f02 100644 --- a/src/DotPulsar/Internal/StateTaskCollection.cs +++ b/src/DotPulsar/Internal/StateTaskCollection.cs @@ -39,6 +39,7 @@ namespace DotPulsar.Internal } node.Value.CancelableCompletionSource.SetupCancellation(() => TaskWasCanceled(node), cancellationToken); + return node.Value.CancelableCompletionSource.Task; }
