This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new acd723d  Fix single partition router test (#78)
acd723d is described below

commit acd723da01c8ae438c853dbf48c1839529819644
Author: Zike Yang <[email protected]>
AuthorDate: Tue Jun 29 16:56:10 2021 +0800

    Fix single partition router test (#78)
    
    * Fix single partition router test.
    
    * small fix.
---
 src/DotPulsar/Abstractions/IProducerBuilder.cs    |  5 +++++
 tests/DotPulsar.IntegrationTests/ProducerTests.cs | 25 ++++++++++++++++-------
 2 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs 
b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 17788a5..9650efa 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -45,6 +45,11 @@ namespace DotPulsar.Abstractions
         IProducerBuilder<TMessage> Topic(string topic);
 
         /// <summary>
+        /// Set the message router for this producer. The default is 
RoundRobinPartitionRouter.
+        /// </summary>
+        IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter);
+
+        /// <summary>
         /// Create the producer.
         /// </summary>
         IProducer<TMessage> Create();
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs 
b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index bcbaf69..742454c 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -22,7 +22,6 @@ namespace DotPulsar.IntegrationTests
     using System;
     using System.Collections.Generic;
     using System.Diagnostics;
-    using System.Linq;
     using System.Threading.Tasks;
     using Xunit;
     using Xunit.Abstractions;
@@ -74,14 +73,25 @@ namespace DotPulsar.IntegrationTests
             string topicName = $"single-partitioned-{Guid.NewGuid():N}";
             const string content = "test-message";
             const int partitions = 3;
+            const int msgCount = 3;
             var consumers = new List<IConsumer<string>>();
 
             await 
_pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", 
partitions);
 
             //Act
-            await using var producer = client.NewProducer(Schema.String)
-                .Topic(topicName)
-                .Create();
+            for (var i = 0; i < partitions; ++i)
+            {
+                await using var producer = client.NewProducer(Schema.String)
+                    .Topic(topicName)
+                    .MessageRouter(new SinglePartitionRouter(i))
+                    .Create();
+
+                for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+                {
+                    await producer.Send($"{content}-{i}-{msgIndex}");
+                    _testOutputHelper.WriteLine($"Sent a message: 
{content}-{i}-{msgIndex}");
+                }
+            }
 
             for (var i = 0; i < partitions; ++i)
             {
@@ -92,11 +102,12 @@ namespace DotPulsar.IntegrationTests
                     .Create());
             }
 
-            await producer.Send(content);
-            _testOutputHelper.WriteLine($"Sent a message: {content}");
+            var msg = await consumers[1].GetLastMessageId();
 
             //Assert
-            (await Task.WhenAny(consumers.Select(c => 
c.Receive().AsTask()).ToList())).Result.Value().Should().Be(content);
+            for (var i = 0; i < partitions; ++i)
+            for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+                (await 
consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
         }
 
         [Fact]

Reply via email to