This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
commit 87b0d8e159d631c2be66b2415865f2e10342e09b Author: SeĢrgio Silveira <[email protected]> AuthorDate: Tue Mar 31 16:59:49 2020 +0200 code review change requests --- src/DotPulsar/Internal/ConsumerChannel.cs | 11 ++-------- src/DotPulsar/Internal/ProducerChannel.cs | 30 +++++++++------------------ src/DotPulsar/Internal/PulsarClientBuilder.cs | 10 +++------ src/DotPulsar/MessageMetadata.cs | 14 ++++++------- 4 files changed, 22 insertions(+), 43 deletions(-) diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index f1c3117..a40a7eb 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -135,10 +135,7 @@ namespace DotPulsar.Internal { _queue.Dispose(); - await _connection.Send(new CommandCloseConsumer - { - ConsumerId = _id - }, CancellationToken.None).ConfigureAwait(false); + await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false); } catch { @@ -162,11 +159,7 @@ namespace DotPulsar.Internal private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken) { - var ack = new CommandAck - { - Type = CommandAck.AckType.Individual, - validation_error = CommandAck.ValidationError.ChecksumMismatch - }; + var ack = new CommandAck { Type = CommandAck.AckType.Individual, validation_error = CommandAck.ValidationError.ChecksumMismatch }; ack.MessageIds.Add(messagePackage.MessageId); diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs index d6181e7..16542dd 100644 --- a/src/DotPulsar/Internal/ProducerChannel.cs +++ b/src/DotPulsar/Internal/ProducerChannel.cs @@ -32,16 +32,9 @@ namespace DotPulsar.Internal public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection) { - _cachedMetadata = new MessageMetadata - { - ProducerName = name - }; + _cachedMetadata = new MessageMetadata { ProducerName = name }; - var commandSend = new CommandSend - { - ProducerId = id, - NumMessages = 1 - }; + var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 }; _cachedSendPackage = new SendPackage(commandSend, _cachedMetadata); @@ -54,10 +47,7 @@ namespace DotPulsar.Internal { try { - await _connection.Send(new CommandCloseProducer - { - ProducerId = _id - }, CancellationToken.None).ConfigureAwait(false); + await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false); } catch { @@ -65,19 +55,21 @@ namespace DotPulsar.Internal } } - public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) + public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) { _cachedSendPackage.Metadata = _cachedMetadata; _cachedSendPackage.Payload = payload; - return await SendPackage(true, cancellationToken).ConfigureAwait(false); + + return SendPackage(true, cancellationToken); } - public async Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) + public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) { metadata.ProducerName = _cachedMetadata.ProducerName; _cachedSendPackage.Metadata = metadata; _cachedSendPackage.Payload = payload; - return await SendPackage(metadata.SequenceId == 0, cancellationToken).ConfigureAwait(false); + + return SendPackage(metadata.SequenceId == 0, cancellationToken); } private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken) @@ -92,9 +84,7 @@ namespace DotPulsar.Internal _cachedSendPackage.Metadata.SequenceId = _sequenceId.Current; } else - { _cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId; - } var response = await _connection.Send(_cachedSendPackage, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.SendReceipt); @@ -108,7 +98,7 @@ namespace DotPulsar.Internal { // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId if (autoAssignSequenceId) - _cachedSendPackage.Metadata.SequenceId = 0; + _cachedSendPackage.Metadata.SequenceId = 0; } } } diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs index 387e87d..201bf01 100644 --- a/src/DotPulsar/Internal/PulsarClientBuilder.cs +++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs @@ -142,18 +142,14 @@ namespace DotPulsar.Internal $"The scheme of the ServiceUrl ({_serviceUrl}) is '{Constants.PulsarSslScheme}' and cannot be used with an encryption policy of 'EnforceUnencrypted'"); } else - { throw new InvalidSchemeException($"Invalid scheme '{scheme}'. Expected '{Constants.PulsarScheme}' or '{Constants.PulsarSslScheme}'"); - } + var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName); var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval); var processManager = new ProcessManager(connectionPool); - - var exceptionHandlerPipeline = new ExceptionHandlerPipeline(new List<IHandleException>(_exceptionHandlers) - { - new DefaultExceptionHandler(_retryInterval) - }); + var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) }; + var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers); return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline); } diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs index 66b10d9..46421b1 100644 --- a/src/DotPulsar/MessageMetadata.cs +++ b/src/DotPulsar/MessageMetadata.cs @@ -71,8 +71,10 @@ namespace DotPulsar { get { - foreach (var prop in Metadata.Properties) + for (var i = 0; i < Metadata.Properties.Count; i++) { + var prop = Metadata.Properties[i]; + if (prop.Key == key) return prop.Value; } @@ -81,8 +83,10 @@ namespace DotPulsar } set { - foreach (var prop in Metadata.Properties) + for (var i = 0; i < Metadata.Properties.Count; i++) { + var prop = Metadata.Properties[i]; + if (prop.Key != key) continue; @@ -91,11 +95,7 @@ namespace DotPulsar return; } - Metadata.Properties.Add(new KeyValue - { - Key = key, - Value = value - }); + Metadata.Properties.Add(new KeyValue { Key = key, Value = value }); } }
