This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
commit 2b79e9e4a88f60f167ca7c128c151c6ecff6e6ec Author: SeĢrgio Silveira <[email protected]> AuthorDate: Sun Mar 29 19:19:14 2020 +0200 removed warnings and improved some code contructs with newer versions. removed some awaits. interpolated all strings. --- DotPulsar.sln.DotSettings | 3 + samples/Consuming/Program.cs | 2 +- samples/Producing/Program.cs | 2 +- samples/Reading/Program.cs | 2 +- src/DotPulsar/Internal/AsyncLock.cs | 2 +- src/DotPulsar/Internal/AsyncLockExecutor.cs | 4 +- src/DotPulsar/Internal/AsyncQueue.cs | 5 +- src/DotPulsar/Internal/ChannelManager.cs | 34 ++----- src/DotPulsar/Internal/Connection.cs | 19 ++-- src/DotPulsar/Internal/ConnectionPool.cs | 6 +- src/DotPulsar/Internal/Constants.cs | 7 +- src/DotPulsar/Internal/Consumer.cs | 6 +- src/DotPulsar/Internal/ConsumerChannel.cs | 20 +++- src/DotPulsar/Internal/DefaultExceptionHandler.cs | 41 +++----- .../Extensions/ReadOnlySequenceExtensions.cs | 2 +- src/DotPulsar/Internal/Process.cs | 2 +- src/DotPulsar/Internal/ProcessManager.cs | 6 +- src/DotPulsar/Internal/Producer.cs | 26 +++--- src/DotPulsar/Internal/ProducerBuilder.cs | 6 +- src/DotPulsar/Internal/ProducerChannel.cs | 16 +++- src/DotPulsar/Internal/ProducerChannelFactory.cs | 6 +- src/DotPulsar/Internal/PulsarClientBuilder.cs | 24 +++-- src/DotPulsar/Internal/PulsarStream.cs | 9 +- src/DotPulsar/Internal/ReaderBuilder.cs | 7 +- src/DotPulsar/Internal/ReaderChannelFactory.cs | 2 +- src/DotPulsar/Internal/RequestResponseHandler.cs | 62 +++++-------- src/DotPulsar/Internal/Serializer.cs | 10 +- src/DotPulsar/MessageId.cs | 10 +- src/DotPulsar/MessageMetadata.cs | 24 ++--- tests/DotPulsar.StressTests/ConnectionTests.cs | 2 +- .../Fixtures/StandaloneClusterFixture.cs | 11 ++- tests/DotPulsar.Tests/Internal/Crc32CTests.cs | 27 +++++- .../Extensions/ReadOnlySequenceExtensionsTests.cs | 103 +++++++++++++++++---- .../Internal/SequenceBuilderTests.cs | 14 ++- tests/DotPulsar.Tests/Internal/SerializerTests.cs | 1 + 35 files changed, 323 insertions(+), 200 deletions(-) diff --git a/DotPulsar.sln.DotSettings b/DotPulsar.sln.DotSettings index c011518..16b30d6 100644 --- a/DotPulsar.sln.DotSettings +++ b/DotPulsar.sln.DotSettings @@ -2,6 +2,7 @@ <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeAccessorOwnerBody/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexedValue"></s:String> <s:Boolean x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeConstructorOrDestructorBody/@EntryIndexRemoved">True</s:Boolean> + <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ArrangeRedundantParentheses/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=SuggestDiscardDeclarationVarStyle/@EntryIndexedValue">DO_NOT_SHOW</s:String> <s:String x:Key="/Default/CodeStyle/CodeCleanup/Profiles/=DotPulsar_003A_0020Full_0020Cleanup/@EntryIndexedValue"><?xml version="1.0" encoding="utf-16"?><Profile name="DotPulsar: Full Cleanup"><XMLReformatCode>True</XMLReformatCode><CSCodeStyleAttributes ArrangeTypeAccessModifier="True" ArrangeTypeMemberAccessModifier="True" SortModifiers="True" RemoveRedundantParentheses="True" AddMissingParentheses="True" ArrangeBraces="True" ArrangeAttributes="True" Arrange [...] &lt;option name="myName" value="DotPulsar: Full Cleanup" /&gt; @@ -50,4 +51,6 @@ <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_BEFORE_FIRST_TYPE_PARAMETER_CONSTRAINT/@EntryValue">True</s:Boolean> <s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_OBJECT_AND_COLLECTION_INITIALIZER_STYLE/@EntryValue">CHOP_ALWAYS</s:String> <s:Boolean x:Key="/Default/CodeStyle/CSharpUsing/AddImportsToDeepestScope/@EntryValue">True</s:Boolean> + <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SSL/@EntryIndexedValue">SSL</s:String> + <s:Boolean x:Key="/Default/UserDictionary/Words/=Xunit/@EntryIndexedValue">True</s:Boolean> </wpf:ResourceDictionary> diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs index b792652..59fe108 100644 --- a/samples/Consuming/Program.cs +++ b/samples/Consuming/Program.cs @@ -23,7 +23,7 @@ namespace Consuming using DotPulsar.Abstractions; using DotPulsar.Extensions; - internal class Program + internal static class Program { private static async Task Main(string[] args) { diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs index b268c6a..73ddff8 100644 --- a/samples/Producing/Program.cs +++ b/samples/Producing/Program.cs @@ -22,7 +22,7 @@ namespace Producing using DotPulsar.Abstractions; using DotPulsar.Extensions; - internal class Program + internal static class Program { private static async Task Main(string[] args) { diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs index 3dbe68f..b42275d 100644 --- a/samples/Reading/Program.cs +++ b/samples/Reading/Program.cs @@ -23,7 +23,7 @@ namespace Reading using DotPulsar.Abstractions; using DotPulsar.Extensions; - internal class Program + internal static class Program { private static async Task Main(string[] args) { diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs index cad8312..f2732f2 100644 --- a/src/DotPulsar/Internal/AsyncLock.cs +++ b/src/DotPulsar/Internal/AsyncLock.cs @@ -46,7 +46,7 @@ namespace DotPulsar.Internal if (_semaphoreSlim.CurrentCount == 1) //Lock is free { - _semaphoreSlim.Wait(); //Will never block + _semaphoreSlim.Wait(cancellationToken); //Will never block return _completedTask; } diff --git a/src/DotPulsar/Internal/AsyncLockExecutor.cs b/src/DotPulsar/Internal/AsyncLockExecutor.cs index 9258784..690c1ed 100644 --- a/src/DotPulsar/Internal/AsyncLockExecutor.cs +++ b/src/DotPulsar/Internal/AsyncLockExecutor.cs @@ -30,8 +30,8 @@ namespace DotPulsar.Internal _executor = executor; } - public async ValueTask DisposeAsync() - => await _lock.DisposeAsync().ConfigureAwait(false); + public ValueTask DisposeAsync() + => _lock.DisposeAsync(); public async ValueTask Execute(Action action, CancellationToken cancellationToken) { diff --git a/src/DotPulsar/Internal/AsyncQueue.cs b/src/DotPulsar/Internal/AsyncQueue.cs index 5d9286e..800cf9f 100644 --- a/src/DotPulsar/Internal/AsyncQueue.cs +++ b/src/DotPulsar/Internal/AsyncQueue.cs @@ -96,7 +96,10 @@ namespace DotPulsar.Internal node.Value.Dispose(); _pendingDequeues.Remove(node); } - catch { } + catch + { + // ignored + } } } diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs index b0d6b27..9667d53 100644 --- a/src/DotPulsar/Internal/ChannelManager.cs +++ b/src/DotPulsar/Internal/ChannelManager.cs @@ -38,6 +38,7 @@ namespace DotPulsar.Internal public Task<ProducerResponse> Outgoing(CommandProducer command, Task<BaseCommand> response, IChannel channel) { var producerId = _producerChannels.Add(channel); + command.ProducerId = producerId; return response.ContinueWith(result => @@ -49,6 +50,7 @@ namespace DotPulsar.Internal } channel.Connected(); + return new ProducerResponse(producerId, result.Result.ProducerSuccess.ProducerName); }, TaskContinuationOptions.OnlyOnRanToCompletion); } @@ -56,6 +58,7 @@ namespace DotPulsar.Internal public Task<SubscribeResponse> Outgoing(CommandSubscribe command, Task<BaseCommand> response, IChannel channel) { var consumerId = _consumerChannels.Add(channel); + command.ConsumerId = consumerId; return response.ContinueWith(result => @@ -103,27 +106,16 @@ namespace DotPulsar.Internal { var channel = _consumerChannels.Remove(consumerId); - if (channel != null) - channel.Unsubscribed(); + channel?.Unsubscribed(); } }, TaskContinuationOptions.OnlyOnRanToCompletion); } public void Incoming(CommandCloseConsumer command) - { - var channel = _consumerChannels.Remove(command.ConsumerId); - - if (channel != null) - channel.ClosedByServer(); - } + => _consumerChannels.Remove(command.ConsumerId)?.ClosedByServer(); public void Incoming(CommandCloseProducer command) - { - var inbox = _producerChannels.Remove(command.ProducerId); - - if (inbox != null) - inbox.ClosedByServer(); - } + => _producerChannels.Remove(command.ProducerId)?.ClosedByServer(); public void Incoming(CommandActiveConsumerChange command) { @@ -139,20 +131,10 @@ namespace DotPulsar.Internal } public void Incoming(CommandReachedEndOfTopic command) - { - var channel = _consumerChannels[command.ConsumerId]; - - if (channel != null) - channel.ReachedEndOfTopic(); - } + => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic(); public void Incoming(CommandMessage command, ReadOnlySequence<byte> data) - { - var consumer = _consumerChannels[command.ConsumerId]; - - if (consumer != null) - consumer.Received(new MessagePackage(command.MessageId, data)); - } + => _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, data)); public void Dispose() { diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs index 6446817..5ec24ff 100644 --- a/src/DotPulsar/Internal/Connection.cs +++ b/src/DotPulsar/Internal/Connection.cs @@ -53,7 +53,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<ProducerResponse>? responseTask = null; + Task<ProducerResponse>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -71,7 +71,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<SubscribeResponse>? responseTask = null; + Task<SubscribeResponse>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -101,7 +101,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<BaseCommand>? responseTask = null; + Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -131,7 +131,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<BaseCommand>? responseTask = null; + Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -149,7 +149,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<BaseCommand>? responseTask = null; + Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -167,7 +167,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<BaseCommand>? response = null; + Task<BaseCommand>? response; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -184,7 +184,7 @@ namespace DotPulsar.Internal { ThrowIfDisposed(); - Task<BaseCommand>? response = null; + Task<BaseCommand>? response; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { @@ -244,7 +244,10 @@ namespace DotPulsar.Internal } } } - catch { } + catch + { + // ignored + } } public async ValueTask DisposeAsync() diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs index 8e5bc8c..03773c4 100644 --- a/src/DotPulsar/Internal/ConnectionPool.cs +++ b/src/DotPulsar/Internal/ConnectionPool.cs @@ -111,7 +111,6 @@ namespace DotPulsar.Internal return response.BrokerServiceUrl; case EncryptionPolicy.PreferEncrypted: return hasBrokerServiceUrlTls ? response.BrokerServiceUrlTls : response.BrokerServiceUrl; - case EncryptionPolicy.PreferUnencrypted: default: return hasBrokerServiceUrl ? response.BrokerServiceUrl : response.BrokerServiceUrlTls; } @@ -170,7 +169,10 @@ namespace DotPulsar.Internal } } } - catch { } + catch + { + // ignored + } } } } diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs index 6e6a51c..4ff4758 100644 --- a/src/DotPulsar/Internal/Constants.cs +++ b/src/DotPulsar/Internal/Constants.cs @@ -21,13 +21,16 @@ namespace DotPulsar.Internal static Constants() { var assemblyName = Assembly.GetCallingAssembly().GetName(); - ClientVersion = assemblyName.Name + " " + assemblyName.Version.ToString(3); + ClientVersion = $"{assemblyName.Name} {assemblyName.Version.ToString(3)}"; ProtocolVersion = 14; PulsarScheme = "pulsar"; PulsarSslScheme = "pulsar+ssl"; DefaultPulsarPort = 6650; DefaultPulsarSSLPort = 6651; - MagicNumber = new byte[] { 0x0e, 0x01 }; + MagicNumber = new byte[] + { + 0x0e, 0x01 + }; MetadataSizeOffset = 6; MetadataOffset = 10; } diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index 5f0e763..5fae580 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -103,7 +103,11 @@ namespace DotPulsar.Internal public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) { ThrowIfDisposed(); - var seek = new CommandSeek { MessageId = messageId.Data }; + + var seek = new CommandSeek + { + MessageId = messageId.Data + }; _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken).ConfigureAwait(false); } diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index 44cfd94..f1c3117 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -42,7 +42,13 @@ namespace DotPulsar.Internal _queue = queue; _connection = connection; _batchHandler = batchHandler; - _cachedCommandFlow = new CommandFlow { ConsumerId = id, MessagePermits = messagePrefetchCount }; + + _cachedCommandFlow = new CommandFlow + { + ConsumerId = id, + MessagePermits = messagePrefetchCount + }; + _sendWhenZero = 0; _firstFlow = true; } @@ -128,7 +134,11 @@ namespace DotPulsar.Internal try { _queue.Dispose(); - await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None).ConfigureAwait(false); + + await _connection.Send(new CommandCloseConsumer + { + ConsumerId = _id + }, CancellationToken.None).ConfigureAwait(false); } catch { @@ -152,7 +162,11 @@ namespace DotPulsar.Internal private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken) { - var ack = new CommandAck { Type = CommandAck.AckType.Individual, validation_error = CommandAck.ValidationError.ChecksumMismatch }; + var ack = new CommandAck + { + Type = CommandAck.AckType.Individual, + validation_error = CommandAck.ValidationError.ChecksumMismatch + }; ack.MessageIds.Add(messagePackage.MessageId); diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs index 855657c..b0a44c3 100644 --- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs +++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs @@ -43,34 +43,23 @@ namespace DotPulsar.Internal { switch (exception) { - case TooManyRequestsException _: - return FaultAction.Retry; - case ChannelNotReadyException _: - return FaultAction.Retry; - case ServiceNotReadyException _: - return FaultAction.Retry; - case ConnectionDisposedException _: - return FaultAction.Retry; - case AsyncLockDisposedException _: - return FaultAction.Retry; - case PulsarStreamDisposedException _: - return FaultAction.Retry; - case AsyncQueueDisposedException _: - return FaultAction.Retry; - case OperationCanceledException _: - return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry; - case DotPulsarException _: - return FaultAction.Rethrow; + case TooManyRequestsException _: return FaultAction.Retry; + case ChannelNotReadyException _: return FaultAction.Retry; + case ServiceNotReadyException _: return FaultAction.Retry; + case ConnectionDisposedException _: return FaultAction.Retry; + case AsyncLockDisposedException _: return FaultAction.Retry; + case PulsarStreamDisposedException _: return FaultAction.Retry; + case AsyncQueueDisposedException _: return FaultAction.Retry; + case OperationCanceledException _: return cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry; + case DotPulsarException _: return FaultAction.Rethrow; case SocketException socketException: - switch (socketException.SocketErrorCode) + return socketException.SocketErrorCode switch { - case SocketError.HostNotFound: - case SocketError.HostUnreachable: - case SocketError.NetworkUnreachable: - return FaultAction.Rethrow; - } - - return FaultAction.Retry; + SocketError.HostNotFound => FaultAction.Rethrow, + SocketError.HostUnreachable => FaultAction.Rethrow, + SocketError.NetworkUnreachable => FaultAction.Rethrow, + _ => FaultAction.Retry + }; } return FaultAction.Rethrow; diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs index 5312166..86cb043 100644 --- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs +++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs @@ -98,7 +98,7 @@ namespace DotPulsar.Internal.Extensions if (read == 3) break; - + start = 0; } diff --git a/src/DotPulsar/Internal/Process.cs b/src/DotPulsar/Internal/Process.cs index bd4f22f..e442740 100644 --- a/src/DotPulsar/Internal/Process.cs +++ b/src/DotPulsar/Internal/Process.cs @@ -70,7 +70,7 @@ namespace DotPulsar.Internal case ChannelUnsubscribed _: ChannelState = ChannelState.Unsubscribed; break; - }; + } CalculateState(); } diff --git a/src/DotPulsar/Internal/ProcessManager.cs b/src/DotPulsar/Internal/ProcessManager.cs index c6eb088..9be1653 100644 --- a/src/DotPulsar/Internal/ProcessManager.cs +++ b/src/DotPulsar/Internal/ProcessManager.cs @@ -36,8 +36,8 @@ namespace DotPulsar.Internal { var processes = _processes.Values.ToArray(); - for (var i = 0; i < processes.Length; ++i) - await processes[i].DisposeAsync().ConfigureAwait(false); + foreach (var proc in processes) + await proc.DisposeAsync().ConfigureAwait(false); await _connectionPool.DisposeAsync().ConfigureAwait(false); } @@ -81,8 +81,6 @@ namespace DotPulsar.Internal process.Handle(e); break; } - - ; } } } diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index e993e85..9a67d64 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -49,11 +49,11 @@ namespace DotPulsar.Internal _eventRegister.Register(new ProducerCreated(_correlationId, this)); } - public async ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) - => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false); + public ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken) + => _state.StateChangedTo(state, cancellationToken); - public async ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) - => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false); + public ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken) + => _state.StateChangedFrom(state, cancellationToken); public bool IsFinalState() => _state.IsFinalState(); @@ -67,27 +67,29 @@ namespace DotPulsar.Internal return; _eventRegister.Register(new ProducerDisposed(_correlationId, this)); + await _channel.DisposeAsync().ConfigureAwait(false); } - public async ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken) - => await Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false); + public ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken) + => Send(new ReadOnlySequence<byte>(data), cancellationToken); - public async ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) - => await Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false); + public ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) + => Send(new ReadOnlySequence<byte>(data), cancellationToken); public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken) { ThrowIfDisposed(); var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false); + return new MessageId(response.MessageId); } - public async ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken) - => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false); + public ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken) + => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken); - public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken) - => await Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false); + public ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken) + => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken); public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken) { diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs index 72a7c4f..8916cd0 100644 --- a/src/DotPulsar/Internal/ProducerBuilder.cs +++ b/src/DotPulsar/Internal/ProducerBuilder.cs @@ -53,7 +53,11 @@ namespace DotPulsar.Internal if (string.IsNullOrEmpty(_topic)) throw new ConfigurationException("ProducerOptions.Topic may not be null or empty"); - var options = new ProducerOptions(_topic!) { InitialSequenceId = _initialSequenceId, ProducerName = _producerName }; + var options = new ProducerOptions(_topic!) + { + InitialSequenceId = _initialSequenceId, + ProducerName = _producerName + }; return _pulsarClient.CreateProducer(options); } diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs index e281154..537104c 100644 --- a/src/DotPulsar/Internal/ProducerChannel.cs +++ b/src/DotPulsar/Internal/ProducerChannel.cs @@ -32,9 +32,16 @@ namespace DotPulsar.Internal public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection) { - _cachedMetadata = new MessageMetadata { ProducerName = name }; + _cachedMetadata = new MessageMetadata + { + ProducerName = name + }; - var commandSend = new CommandSend { ProducerId = id, NumMessages = 1 }; + var commandSend = new CommandSend + { + ProducerId = id, + NumMessages = 1 + }; _cachedSendPackage = new SendPackage(commandSend, _cachedMetadata); @@ -47,7 +54,10 @@ namespace DotPulsar.Internal { try { - await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None).ConfigureAwait(false); + await _connection.Send(new CommandCloseProducer + { + ProducerId = _id + }, CancellationToken.None).ConfigureAwait(false); } catch { diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs index 4cbd01a..1b291fc 100644 --- a/src/DotPulsar/Internal/ProducerChannelFactory.cs +++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs @@ -42,7 +42,11 @@ namespace DotPulsar.Internal _executor = executor; _sequenceId = new SequenceId(options.InitialSequenceId); - _commandProducer = new CommandProducer { ProducerName = options.ProducerName, Topic = options.Topic }; + _commandProducer = new CommandProducer + { + ProducerName = options.ProducerName, + Topic = options.Topic + }; } public async Task<IProducerChannel> Create(CancellationToken cancellationToken) diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs index fd6870a..95827e6 100644 --- a/src/DotPulsar/Internal/PulsarClientBuilder.cs +++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs @@ -38,11 +38,15 @@ namespace DotPulsar.Internal public PulsarClientBuilder() { - _commandConnect = new CommandConnect { ProtocolVersion = Constants.ProtocolVersion, ClientVersion = Constants.ClientVersion }; + _commandConnect = new CommandConnect + { + ProtocolVersion = Constants.ProtocolVersion, + ClientVersion = Constants.ClientVersion + }; _exceptionHandlers = new List<IHandleException>(); _retryInterval = TimeSpan.FromSeconds(3); - _serviceUrl = new Uri(Constants.PulsarScheme + "://localhost:" + Constants.DefaultPulsarPort); + _serviceUrl = new Uri($"{Constants.PulsarScheme}://localhost:{Constants.DefaultPulsarPort}"); _clientCertificates = new X509Certificate2Collection(); _verifyCertificateAuthority = true; _verifyCertificateName = false; @@ -123,8 +127,7 @@ namespace DotPulsar.Internal if (scheme == Constants.PulsarScheme) { - if (!_encryptionPolicy.HasValue) - _encryptionPolicy = EncryptionPolicy.EnforceUnencrypted; + _encryptionPolicy ??= EncryptionPolicy.EnforceUnencrypted; if (_encryptionPolicy.Value == EncryptionPolicy.EnforceEncrypted) throw new ConnectionSecurityException( @@ -132,8 +135,7 @@ namespace DotPulsar.Internal } else if (scheme == Constants.PulsarSslScheme) { - if (!_encryptionPolicy.HasValue) - _encryptionPolicy = EncryptionPolicy.EnforceEncrypted; + _encryptionPolicy ??= EncryptionPolicy.EnforceEncrypted; if (_encryptionPolicy.Value == EncryptionPolicy.EnforceUnencrypted) throw new ConnectionSecurityException( @@ -145,10 +147,18 @@ namespace DotPulsar.Internal } var connector = new Connector(_clientCertificates, _trustedCertificateAuthority, _verifyCertificateAuthority, _verifyCertificateName); + var connectionPool = new ConnectionPool(_commandConnect, _serviceUrl, connector, _encryptionPolicy.Value, _closeInactiveConnectionsInterval); - var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) }; + + var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) + { + new DefaultExceptionHandler(_retryInterval) + }; + var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers); + var processManager = new ProcessManager(connectionPool); + return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline); } } diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs index d4c3f97..46e1cec 100644 --- a/src/DotPulsar/Internal/PulsarStream.cs +++ b/src/DotPulsar/Internal/PulsarStream.cs @@ -106,10 +106,13 @@ namespace DotPulsar.Internal break; } } - catch { } + catch + { + // ignored + } finally { - _writer.Complete(); + await _writer.CompleteAsync(); } } @@ -150,7 +153,7 @@ namespace DotPulsar.Internal } finally { - _reader.Complete(); + await _reader.CompleteAsync(); } } diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs index 16d99ab..41ff928 100644 --- a/src/DotPulsar/Internal/ReaderBuilder.cs +++ b/src/DotPulsar/Internal/ReaderBuilder.cs @@ -71,7 +71,12 @@ namespace DotPulsar.Internal if (string.IsNullOrEmpty(_topic)) throw new ConfigurationException("Topic may not be null or empty"); - var options = new ReaderOptions(_startMessageId, _topic!) { MessagePrefetchCount = _messagePrefetchCount, ReadCompacted = _readCompacted, ReaderName = _readerName }; + var options = new ReaderOptions(_startMessageId, _topic!) + { + MessagePrefetchCount = _messagePrefetchCount, + ReadCompacted = _readCompacted, + ReaderName = _readerName + }; return _pulsarClient.CreateReader(options); } diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs index 4458b7a..b22aa76 100644 --- a/src/DotPulsar/Internal/ReaderChannelFactory.cs +++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs @@ -49,7 +49,7 @@ namespace DotPulsar.Internal Durable = false, ReadCompacted = options.ReadCompacted, StartMessageId = options.StartMessageId.Data, - Subscription = "Reader-" + Guid.NewGuid().ToString("N"), + Subscription = $"Reader-{Guid.NewGuid():N}", Topic = options.Topic }; diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs index 79c8533..5b7ce56 100644 --- a/src/DotPulsar/Internal/RequestResponseHandler.cs +++ b/src/DotPulsar/Internal/RequestResponseHandler.cs @@ -83,47 +83,27 @@ namespace DotPulsar.Internal } private string GetResponseIdentifier(BaseCommand cmd) - { - switch (cmd.CommandType) + => cmd.CommandType switch { - case BaseCommand.Type.Connect: - case BaseCommand.Type.Connected: - return ConnectResponseIdentifier; - case BaseCommand.Type.Send: - return cmd.Send.ProducerId.ToString() + '-' + cmd.Send.SequenceId; - case BaseCommand.Type.SendError: - return cmd.SendError.ProducerId.ToString() + '-' + cmd.SendError.SequenceId; - case BaseCommand.Type.SendReceipt: - return cmd.SendReceipt.ProducerId.ToString() + '-' + cmd.SendReceipt.SequenceId; - case BaseCommand.Type.Error: - return _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(); - case BaseCommand.Type.Producer: - return cmd.Producer.RequestId.ToString(); - case BaseCommand.Type.ProducerSuccess: - return cmd.ProducerSuccess.RequestId.ToString(); - case BaseCommand.Type.CloseProducer: - return cmd.CloseProducer.RequestId.ToString(); - case BaseCommand.Type.Lookup: - return cmd.LookupTopic.RequestId.ToString(); - case BaseCommand.Type.LookupResponse: - return cmd.LookupTopicResponse.RequestId.ToString(); - case BaseCommand.Type.Unsubscribe: - return cmd.Unsubscribe.RequestId.ToString(); - case BaseCommand.Type.Subscribe: - return cmd.Subscribe.RequestId.ToString(); - case BaseCommand.Type.Success: - return cmd.Success.RequestId.ToString(); - case BaseCommand.Type.Seek: - return cmd.Seek.RequestId.ToString(); - case BaseCommand.Type.CloseConsumer: - return cmd.CloseConsumer.RequestId.ToString(); - case BaseCommand.Type.GetLastMessageId: - return cmd.GetLastMessageId.RequestId.ToString(); - case BaseCommand.Type.GetLastMessageIdResponse: - return cmd.GetLastMessageIdResponse.RequestId.ToString(); - default: - throw new ArgumentOutOfRangeException("CommandType", cmd.CommandType, "CommandType not supported as request/response type"); - } - } + BaseCommand.Type.Connect => ConnectResponseIdentifier, + BaseCommand.Type.Connected => ConnectResponseIdentifier, + BaseCommand.Type.Send => $"{cmd.Send.ProducerId}-{cmd.Send.SequenceId}", + BaseCommand.Type.SendError => $"{cmd.SendError.ProducerId}-{cmd.SendError.SequenceId}", + BaseCommand.Type.SendReceipt => $"{cmd.SendReceipt.ProducerId}-{cmd.SendReceipt.SequenceId}", + BaseCommand.Type.Error => _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(), + BaseCommand.Type.Producer => cmd.Producer.RequestId.ToString(), + BaseCommand.Type.ProducerSuccess => cmd.ProducerSuccess.RequestId.ToString(), + BaseCommand.Type.CloseProducer => cmd.CloseProducer.RequestId.ToString(), + BaseCommand.Type.Lookup => cmd.LookupTopic.RequestId.ToString(), + BaseCommand.Type.LookupResponse => cmd.LookupTopicResponse.RequestId.ToString(), + BaseCommand.Type.Unsubscribe => cmd.Unsubscribe.RequestId.ToString(), + BaseCommand.Type.Subscribe => cmd.Subscribe.RequestId.ToString(), + BaseCommand.Type.Success => cmd.Success.RequestId.ToString(), + BaseCommand.Type.Seek => cmd.Seek.RequestId.ToString(), + BaseCommand.Type.CloseConsumer => cmd.CloseConsumer.RequestId.ToString(), + BaseCommand.Type.GetLastMessageId => cmd.GetLastMessageId.RequestId.ToString(), + BaseCommand.Type.GetLastMessageIdResponse => cmd.GetLastMessageIdResponse.RequestId.ToString(), + _ => throw new ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, "CommandType not supported as request/response type") + }; } } diff --git a/src/DotPulsar/Internal/Serializer.cs b/src/DotPulsar/Internal/Serializer.cs index ca5f108..a7fc650 100644 --- a/src/DotPulsar/Internal/Serializer.cs +++ b/src/DotPulsar/Internal/Serializer.cs @@ -65,8 +65,14 @@ namespace DotPulsar.Internal var union = new UIntUnion(integer); return BitConverter.IsLittleEndian - ? new[] { union.B3, union.B2, union.B1, union.B0 } - : new[] { union.B0, union.B1, union.B2, union.B3 }; + ? new[] + { + union.B3, union.B2, union.B1, union.B0 + } + : new[] + { + union.B0, union.B1, union.B2, union.B3 + }; } private static byte[] Serialize<T>(T item) diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs index 9e90967..4118eb9 100644 --- a/src/DotPulsar/MessageId.cs +++ b/src/DotPulsar/MessageId.cs @@ -32,7 +32,13 @@ namespace DotPulsar => Data = messageIdData; public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex) - => Data = new MessageIdData { LedgerId = ledgerId, EntryId = entryId, Partition = partition, BatchIndex = batchIndex }; + => Data = new MessageIdData + { + LedgerId = ledgerId, + EntryId = entryId, + Partition = partition, + BatchIndex = batchIndex + }; internal MessageIdData Data { get; } @@ -42,7 +48,7 @@ namespace DotPulsar public int BatchIndex => Data.BatchIndex; public override bool Equals(object o) - => o is MessageId ? Equals((MessageId) o) : false; + => o is MessageId id && Equals(id); public bool Equals(MessageId other) => LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex; diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs index 2af466e..66b10d9 100644 --- a/src/DotPulsar/MessageMetadata.cs +++ b/src/DotPulsar/MessageMetadata.cs @@ -23,7 +23,7 @@ namespace DotPulsar public MessageMetadata() => Metadata = new Internal.PulsarApi.MessageMetadata(); - internal Internal.PulsarApi.MessageMetadata Metadata; + internal readonly Internal.PulsarApi.MessageMetadata Metadata; public long DeliverAtTime { @@ -71,31 +71,31 @@ namespace DotPulsar { get { - for (var i = 0; i < Metadata.Properties.Count; ++i) + foreach (var prop in Metadata.Properties) { - var keyValye = Metadata.Properties[i]; - - if (keyValye.Key == key) - return keyValye.Value; + if (prop.Key == key) + return prop.Value; } return null; } set { - for (var i = 0; i < Metadata.Properties.Count; ++i) + foreach (var prop in Metadata.Properties) { - var keyValye = Metadata.Properties[i]; - - if (keyValye.Key != key) + if (prop.Key != key) continue; - keyValye.Value = value; + prop.Value = value; return; } - Metadata.Properties.Add(new KeyValue { Key = key, Value = value }); + Metadata.Properties.Add(new KeyValue + { + Key = key, + Value = value + }); } } diff --git a/tests/DotPulsar.StressTests/ConnectionTests.cs b/tests/DotPulsar.StressTests/ConnectionTests.cs index cafc5b6..97aee00 100644 --- a/tests/DotPulsar.StressTests/ConnectionTests.cs +++ b/tests/DotPulsar.StressTests/ConnectionTests.cs @@ -55,7 +55,7 @@ namespace DotPulsar.StressTests var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); - //Act // Assert + //Act & Assert await producer.StateChangedTo(ProducerState.Connected, cts.Token); } } diff --git a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs index 2874bfd..c7618cf 100644 --- a/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs +++ b/tests/DotPulsar.StressTests/Fixtures/StandaloneClusterFixture.cs @@ -30,7 +30,10 @@ namespace DotPulsar.StressTests.Fixtures var waitTries = 10; - using var handler = new HttpClientHandler { AllowAutoRedirect = true }; + using var handler = new HttpClientHandler + { + AllowAutoRedirect = true + }; using var client = new HttpClient(handler); @@ -62,7 +65,11 @@ namespace DotPulsar.StressTests.Fixtures private static void RunProcess(string name, string arguments) { - var processStartInfo = new ProcessStartInfo { FileName = name, Arguments = arguments }; + var processStartInfo = new ProcessStartInfo + { + FileName = name, + Arguments = arguments + }; processStartInfo.Environment["TAG"] = "test"; processStartInfo.Environment["CONFIGURATION"] = "Debug"; diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs index 2c3b27e..e1a62dd 100644 --- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs +++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs @@ -23,7 +23,13 @@ namespace DotPulsar.Tests.Internal public void Calculate_GivenSequenceWithSingleSegment_ShouldReturnExpectedChecksum() { //Arrange - var segment = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c }; + var segment = new byte[] + { + 0x10, 0x01, 0x18, 0xc9, + 0xf8, 0x86, 0x94, 0xeb, + 0x2c + }; + var sequence = new SequenceBuilder<byte>().Append(segment).Build(); //Act @@ -40,10 +46,22 @@ namespace DotPulsar.Tests.Internal //Arrange var s1 = new byte[] { - 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f, 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18, 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, - 0x58, 0x01 + 0x0a, 0x0f, 0x73, 0x74, + 0x61, 0x6e, 0x64, 0x61, + 0x6c, 0x6f, 0x6e, 0x65, + 0x2d, 0x33, 0x30, 0x2d, + 0x35, 0x10, 0x00, 0x18, + 0xc7, 0xee, 0xa3, 0x93, + 0xeb, 0x2c, 0x58, 0x01 + }; + + var s2 = new byte[] + { + 0x10, 0x01, 0x18, 0xc9, + 0xf8, 0x86, 0x94, 0xeb, + 0x2c }; - var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 0x2c }; + var sequence = new SequenceBuilder<byte>().Append(s1).Append(s2).Build(); //Act @@ -51,6 +69,7 @@ namespace DotPulsar.Tests.Internal //Assert const uint expected = 1079987866; + Assert.Equal(expected, actual); } } diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs index 91f2c7b..fbdeebb 100644 --- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs +++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs @@ -24,10 +24,16 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenToShortSequenceWithSingleSegment_ShouldReturnFalse() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01 + }); //Assert Assert.False(actual); @@ -37,10 +43,16 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnFalse() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02, 0x01 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x02, 0x01 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01 + }); //Assert Assert.False(actual); @@ -50,10 +62,16 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenSequenceWithSingleSegment_ShouldReturnTrue() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x01, 0x02 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01 + }); //Assert Assert.True(actual); @@ -63,10 +81,19 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenToShortSequenceWithMultipleSegments_ShouldReturnFalse() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x01 + }).Append(new byte[] + { + 0x02 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02, 0x03 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01, 0x02, 0x03 + }); //Assert Assert.False(actual); @@ -76,10 +103,19 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnFalse() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x02 }).Append(new byte[] { 0x01, 0x03 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x02 + }).Append(new byte[] + { + 0x01, 0x03 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01, 0x02 + }); //Assert Assert.False(actual); @@ -89,10 +125,19 @@ namespace DotPulsar.Tests.Internal.Extensions public void StartsWith_GivenSequenceWithMultipleSegments_ShouldReturnTrue() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x01 + }).Append(new byte[] + { + 0x02, 0x03 + }).Build(); //Act - var actual = sequence.StartsWith(new byte[] { 0x00, 0x01, 0x02 }); + var actual = sequence.StartsWith(new byte[] + { + 0x00, 0x01, 0x02 + }); //Assert Assert.True(actual); @@ -102,7 +147,10 @@ namespace DotPulsar.Tests.Internal.Extensions public void ReadUInt32_GivenSequenceWithSingleSegment_ShouldGiveExceptedResult() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01, 0x02, 0x03 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x01, 0x02, 0x03 + }).Build(); //Act var actual = sequence.ReadUInt32(0, true); @@ -116,7 +164,11 @@ namespace DotPulsar.Tests.Internal.Extensions public void ReadUInt32_GivenSequenceWithSingleSegmentAndNonZeroStart_ShouldGiveExceptedResult() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x09, 0x00, 0x01, 0x02, 0x03 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x09, 0x00, 0x01, 0x02, + 0x03 + }).Build(); //Act var actual = sequence.ReadUInt32(1, true); @@ -130,7 +182,13 @@ namespace DotPulsar.Tests.Internal.Extensions public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult() { //Arrange - var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build(); + var sequence = new SequenceBuilder<byte>().Append(new byte[] + { + 0x00, 0x01 + }).Append(new byte[] + { + 0x02, 0x03 + }).Build(); //Act var actual = sequence.ReadUInt32(0, true); @@ -145,9 +203,18 @@ namespace DotPulsar.Tests.Internal.Extensions { //Arrange var sequence = new SequenceBuilder<byte>() - .Append(new byte[] { 0x09, 0x09, 0x09 }) - .Append(new byte[] { 0x09, 0x00, 0x01 }) - .Append(new byte[] { 0x02, 0x03 }).Build(); + .Append(new byte[] + { + 0x09, 0x09, 0x09 + }) + .Append(new byte[] + { + 0x09, 0x00, 0x01 + }) + .Append(new byte[] + { + 0x02, 0x03 + }).Build(); //Act var actual = sequence.ReadUInt32(4, true); diff --git a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs index 1a2ae51..878f241 100644 --- a/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs +++ b/tests/DotPulsar.Tests/Internal/SequenceBuilderTests.cs @@ -27,10 +27,9 @@ namespace DotPulsar.Tests.Internal var a = new byte[] { 0x00, 0x01, 0x02, 0x03 }; var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 }; var c = new byte[] { 0x09 }; - var builder = new SequenceBuilder<byte>().Append(a).Append(b).Append(c); //Act - var sequence = builder.Build(); + var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Append(c).Build(); //Assert var array = sequence.ToArray(); @@ -48,11 +47,11 @@ namespace DotPulsar.Tests.Internal var c = new byte[] { 0x04, 0x05 }; var d = new byte[] { 0x06, 0x07 }; var e = new byte[] { 0x08, 0x09 }; + var seq = new SequenceBuilder<byte>().Append(b).Append(c).Append(d).Build(); - var builder = new SequenceBuilder<byte>().Append(a).Append(seq).Append(e); //Act - var sequence = builder.Build(); + var sequence = new SequenceBuilder<byte>().Append(a).Append(seq).Append(e).Build(); //Assert var array = sequence.ToArray(); @@ -68,10 +67,9 @@ namespace DotPulsar.Tests.Internal var a = new byte[] { 0x00, 0x01, 0x02, 0x03 }; var b = new byte[] { 0x04, 0x05, 0x06, 0x07, 0x08 }; var c = new byte[] { 0x09 }; - var builder = new SequenceBuilder<byte>().Prepend(c).Prepend(b).Prepend(a); //Act - var sequence = builder.Build(); + var sequence = new SequenceBuilder<byte>().Prepend(c).Prepend(b).Prepend(a).Build(); //Assert var array = sequence.ToArray(); @@ -89,11 +87,11 @@ namespace DotPulsar.Tests.Internal var c = new byte[] { 0x04, 0x05 }; var d = new byte[] { 0x06, 0x07 }; var e = new byte[] { 0x08, 0x09 }; + var seq = new SequenceBuilder<byte>().Prepend(d).Prepend(c).Prepend(b).Build(); - var builder = new SequenceBuilder<byte>().Prepend(e).Prepend(seq).Prepend(a); //Act - var sequence = builder.Build(); + var sequence = new SequenceBuilder<byte>().Prepend(e).Prepend(seq).Prepend(a).Build(); //Assert var array = sequence.ToArray(); diff --git a/tests/DotPulsar.Tests/Internal/SerializerTests.cs b/tests/DotPulsar.Tests/Internal/SerializerTests.cs index 87df2b3..ac157f3 100644 --- a/tests/DotPulsar.Tests/Internal/SerializerTests.cs +++ b/tests/DotPulsar.Tests/Internal/SerializerTests.cs @@ -30,6 +30,7 @@ namespace DotPulsar.Tests.Internal //Assert var expected = new byte[] { 0x00, 0x01, 0x02, 0x03 }; + Assert.Equal(expected, actual); } }
