This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2c6c4 Add support for AllowOutOfOrderDelivery on key_shared
subscriptions (#268)
8b2c6c4 is described below
commit 8b2c6c4c32176d2aadc5dff66451f1c83e663b58
Author: Shaun Becker <[email protected]>
AuthorDate: Mon May 26 03:54:33 2025 -0400
Add support for AllowOutOfOrderDelivery on key_shared subscriptions (#268)
---
src/DotPulsar/Abstractions/IConsumerBuilder.cs | 5 +++++
src/DotPulsar/ConsumerOptions.cs | 8 ++++++++
src/DotPulsar/Internal/Consumer.cs | 8 ++++++++
src/DotPulsar/Internal/ConsumerBuilder.cs | 10 +++++++++-
4 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index a314dc9..8044b26 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -91,6 +91,11 @@ public interface IConsumerBuilder<TMessage>
/// </summary>
IConsumerBuilder<TMessage> TopicsPattern(Regex topicsPattern);
+ /// <summary>
+ /// Whether to allow out-of-order delivery on key_shared subscriptions.
The default is 'false'.
+ /// </summary>
+ IConsumerBuilder<TMessage> AllowOutOfOrderDeliver(bool
allowOutOfOrderDeliver);
+
/// <summary>
/// Create the consumer.
/// </summary>
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index f523ce8..a0ee1ae 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -184,4 +184,12 @@ public sealed class ConsumerOptions<TMessage>
/// Specify a pattern for topics that this consumer subscribes to. This,
or setting a single topic or multiple topics, is required.
/// </summary>
public Regex? TopicsPattern { get; set; }
+
+ /// <summary>
+ /// Allow out-of-order delivery on key_shared subscriptions. The default
is 'false'.
+ /// </summary>
+ /// <remarks>
+ ///
https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#preserving-order-of-processing
+ /// </remarks>
+ public bool AllowOutOfOrderDeliver { get; set; }
}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 70a1ee7..a89f997 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -450,6 +450,14 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>
subscribe.SubscriptionProperties.Add(keyValue);
}
+ if (_consumerOptions is { SubscriptionType:
SubscriptionType.KeyShared, AllowOutOfOrderDeliver: true })
+ {
+ subscribe.keySharedMeta = new KeySharedMeta
+ {
+ allowOutOfOrderDelivery = true
+ };
+ }
+
var messagePrefetchCount = _consumerOptions.MessagePrefetchCount;
var messageFactory = new
MessageFactory<TMessage>(_consumerOptions.Schema);
var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs
b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 90b2880..7bdc59c 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -36,6 +36,7 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
private readonly HashSet<string> _topics;
private Regex? _topicsPattern;
private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
+ private bool _allowOutOfOrderDeliver;
public ConsumerBuilder(IPulsarClient pulsarClient, ISchema<TMessage>
schema)
{
@@ -143,6 +144,12 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
return this;
}
+ public IConsumerBuilder<TMessage> AllowOutOfOrderDeliver(bool
allowOutOfOrderDeliver)
+ {
+ _allowOutOfOrderDeliver = allowOutOfOrderDeliver;
+ return this;
+ }
+
public IConsumer<TMessage> Create()
{
if (string.IsNullOrEmpty(_subscriptionName))
@@ -164,7 +171,8 @@ public sealed class ConsumerBuilder<TMessage> :
IConsumerBuilder<TMessage>
SubscriptionProperties = _subscriptionProperties,
SubscriptionType = _subscriptionType,
Topics = _topics,
- TopicsPattern = _topicsPattern
+ TopicsPattern = _topicsPattern,
+ AllowOutOfOrderDeliver = _allowOutOfOrderDeliver
};
return _pulsarClient.CreateConsumer(options);