This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2c6c4  Add support for AllowOutOfOrderDelivery on key_shared 
subscriptions (#268)
8b2c6c4 is described below

commit 8b2c6c4c32176d2aadc5dff66451f1c83e663b58
Author: Shaun Becker <[email protected]>
AuthorDate: Mon May 26 03:54:33 2025 -0400

    Add support for AllowOutOfOrderDelivery on key_shared subscriptions (#268)
---
 src/DotPulsar/Abstractions/IConsumerBuilder.cs |  5 +++++
 src/DotPulsar/ConsumerOptions.cs               |  8 ++++++++
 src/DotPulsar/Internal/Consumer.cs             |  8 ++++++++
 src/DotPulsar/Internal/ConsumerBuilder.cs      | 10 +++++++++-
 4 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs 
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index a314dc9..8044b26 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -91,6 +91,11 @@ public interface IConsumerBuilder<TMessage>
     /// </summary>
     IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern);
 
+    /// <summary>
+    /// Whether to allow out-of-order delivery on key_shared subscriptions. 
The default is 'false'.
+    /// </summary>
+    IConsumerBuilder<TMessage> AllowOutOfOrderDeliver(bool 
allowOutOfOrderDeliver);
+
     /// <summary>
     /// Create the consumer.
     /// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index f523ce8..a0ee1ae 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -184,4 +184,12 @@ public sealed class ConsumerOptions<TMessage>
     /// Specify a pattern for topics that this consumer subscribes to. This, 
or setting a single topic or multiple topics, is required.
     /// </summary>
     public Regex? TopicsPattern { get; set; }
+
+    /// <summary>
+    /// Allow out-of-order delivery on key_shared subscriptions. The default 
is 'false'.
+    /// </summary>
+    /// <remarks>
+    /// 
https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#preserving-order-of-processing
+    /// </remarks>
+    public bool AllowOutOfOrderDeliver { get; set; }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 70a1ee7..a89f997 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -450,6 +450,14 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>
             subscribe.SubscriptionProperties.Add(keyValue);
         }
 
+        if (_consumerOptions is { SubscriptionType: 
SubscriptionType.KeyShared, AllowOutOfOrderDeliver: true })
+        {
+            subscribe.keySharedMeta = new KeySharedMeta
+            {
+                allowOutOfOrderDelivery = true
+            };
+        }
+
         var messagePrefetchCount = _consumerOptions.MessagePrefetchCount;
         var messageFactory = new 
MessageFactory<TMessage>(_consumerOptions.Schema);
         var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs 
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 90b2880..7bdc59c 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -36,6 +36,7 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
     private readonly HashSet<string> _topics;
     private Regex? _topicsPattern;
     private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
+    private bool _allowOutOfOrderDeliver;
 
     public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage> 
schema)
     {
@@ -143,6 +144,12 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
         return this;
     }
 
+    public IConsumerBuilder<TMessage> AllowOutOfOrderDeliver(bool 
allowOutOfOrderDeliver)
+    {
+        _allowOutOfOrderDeliver = allowOutOfOrderDeliver;
+        return this;
+    }
+
     public IConsumer<TMessage> Create()
     {
         if (string.IsNullOrEmpty(_subscriptionName))
@@ -164,7 +171,8 @@ public sealed class ConsumerBuilder<TMessage> : 
IConsumerBuilder<TMessage>
             SubscriptionProperties = _subscriptionProperties,
             SubscriptionType = _subscriptionType,
             Topics = _topics,
-            TopicsPattern = _topicsPattern
+            TopicsPattern = _topicsPattern,
+            AllowOutOfOrderDeliver = _allowOutOfOrderDeliver
         };
 
         return _pulsarClient.CreateConsumer(options);

Reply via email to