This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new c1d45d5 Adding constructors to the message routers c1d45d5 is described below commit c1d45d56e4fa1f062a63a0cae5d39b3f68f18416 Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Mon Jun 28 23:14:47 2021 +0200 Adding constructors to the message routers --- src/DotPulsar/Internal/Producer.cs | 3 +++ src/DotPulsar/Internal/ProducerBuilder.cs | 2 +- src/DotPulsar/RoundRobinPartitionRouter.cs | 18 ++++++++++++------ src/DotPulsar/SinglePartitionRouter.cs | 27 ++++++++++++++++++++------- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index 6079994..10c4fd5 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -214,6 +214,9 @@ namespace DotPulsar.Internal throw _throw; } + if (_producerCount == 1) + return 0; + return _messageRouter.ChoosePartition(metadata, _producerCount); } diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs index 7470fe9..84345e7 100644 --- a/src/DotPulsar/Internal/ProducerBuilder.cs +++ b/src/DotPulsar/Internal/ProducerBuilder.cs @@ -88,7 +88,7 @@ namespace DotPulsar.Internal if (_messageRouter is not null) options.MessageRouter = _messageRouter; - return _pulsarClient.CreateProducer<TMessage>(options); + return _pulsarClient.CreateProducer(options); } } } diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs index a7da420..d6a1738 100644 --- a/src/DotPulsar/RoundRobinPartitionRouter.cs +++ b/src/DotPulsar/RoundRobinPartitionRouter.cs @@ -19,15 +19,21 @@ namespace DotPulsar using System.Threading; /// <summary> - /// Round robin partition messages router - /// If no key is provided, the producer will publish messages across all partitions in round-robin fashion - /// to achieve maximum throughput. While if a key is specified on the message, the partitioned producer will - /// hash the key and assign message to a particular partition. - /// This is the default mode. + /// The round robin partition messages router, which is the default router. + /// If a key is provided, the producer will hash the key and publish the message to a particular partition. + /// If a key is not provided, the producer will publish messages across all partitions in a round-robin fashion to achieve maximum throughput. /// </summary> public sealed class RoundRobinPartitionRouter : IMessageRouter { - private int _partitionIndex = -1; + private int _partitionIndex; + + /// <summary> + /// Initializes a new instance of the round robin partition router + /// </summary> + public RoundRobinPartitionRouter() + { + _partitionIndex = -1; + } /// <summary> /// Choose a partition in round robin routing mode diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs index 69a82f5..61c241e 100644 --- a/src/DotPulsar/SinglePartitionRouter.cs +++ b/src/DotPulsar/SinglePartitionRouter.cs @@ -19,15 +19,26 @@ namespace DotPulsar using System; /// <summary> - /// If no key is provided, the producer will randomly pick one single partition and publish all the messages - /// into that partition. While if a key is specified on the message, the partitioned producer will hash the - /// key and assign message to a particular partition. + /// The single partition messages router. + /// If a key is provided, the producer will hash the key and publish the message to a particular partition. + /// If a key is not provided, the producer will randomly pick one single partition and publish all messages to that partition. /// </summary> public sealed class SinglePartitionRouter : IMessageRouter { - private int? _partitionIndex; + private int _partitionIndex; - internal SinglePartitionRouter(int? partitionIndex = null) + /// <summary> + /// Initializes a new instance of the single partition router that will randomly select a partition + /// </summary> + public SinglePartitionRouter() + { + _partitionIndex = -1; + } + + /// <summary> + /// Initializes a new instance of the single partition router that will publish all messages to the given partition + /// </summary> + public SinglePartitionRouter(int partitionIndex) { _partitionIndex = partitionIndex; } @@ -41,8 +52,10 @@ namespace DotPulsar if (keyBytes is not null) return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount; - _partitionIndex ??= new Random().Next(0, partitionsCount); - return _partitionIndex.Value; + if (_partitionIndex == -1) + _partitionIndex = new Random().Next(0, partitionsCount); + + return _partitionIndex; } } }