This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed51103  The `Process` extension method will hang when called with 
EnsureOrderedAcknowledgment set to true, a shared subscription and 
MaxDegreeOfParallelism above 1. It now throws a `ProcessingException` when 
EnsureOrderedAcknowledgment is set to true and with a shared subscription type.
ed51103 is described below

commit ed511035178167c6b8b904130939273cd56587b6
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Jun 17 17:10:53 2024 +0200

    The `Process` extension method will hang when called with 
EnsureOrderedAcknowledgment set to true, a shared subscription and 
MaxDegreeOfParallelism above 1.
    It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is 
set to true and with a shared subscription type.
---
 CHANGELOG.md                                    |  6 ++++--
 src/DotPulsar/Abstractions/IConsumer.cs         |  5 +++++
 src/DotPulsar/Exceptions/ProcessingException.cs | 23 +++++++++++++++++++++++
 src/DotPulsar/Internal/Consumer.cs              |  4 +++-
 src/DotPulsar/Internal/MessageProcessor.cs      | 11 ++++++++++-
 src/DotPulsar/Internal/SubConsumer.cs           |  3 +++
 6 files changed, 48 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 957ba44..03fe981 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,8 +8,10 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 
 ### Fixed
 
-- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` 
dispose causing an unintended
-  `DivideByZeroException`. It now correctly throws a `ProducerClosedException`
+- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()` 
dispose causing an unintended `DivideByZeroException`.
+  It now throws a `ProducerClosedException`.
+- The `Process` extension method will hang when called with 
EnsureOrderedAcknowledgment set to true, a shared subscription and 
MaxDegreeOfParallelism above 1.
+  It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is 
set to true and with a shared subscription type.
 
 ## [3.3.0] - 2024-06-10
 
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs 
b/src/DotPulsar/Abstractions/IConsumer.cs
index a7cf266..55ef779 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -39,6 +39,11 @@ public interface IConsumer : IGetLastMessageIds, ISeek, 
IState<ConsumerState>, I
     /// </summary>
     public string SubscriptionName { get; }
 
+    /// <summary>
+    /// The consumer's subscription type.
+    /// </summary>
+    public SubscriptionType SubscriptionType { get; }
+
     /// <summary>
     /// The consumer's topic.
     /// </summary>
diff --git a/src/DotPulsar/Exceptions/ProcessingException.cs 
b/src/DotPulsar/Exceptions/ProcessingException.cs
new file mode 100644
index 0000000..f8f60fd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProcessingException.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// There was an issue while setting up for processing
+/// </summary>
+public sealed class ProcessingException : DotPulsarException
+{
+    public ProcessingException(string message) : base(message) { }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 55da102..18e326d 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -43,6 +43,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
 
     public Uri ServiceUrl { get; }
     public string SubscriptionName { get; }
+    public SubscriptionType SubscriptionType { get; }
     public string Topic { get; }
 
     public Consumer(
@@ -55,6 +56,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _state = CreateStateManager();
         ServiceUrl = serviceUrl;
         SubscriptionName = consumerOptions.SubscriptionName;
+        SubscriptionType = consumerOptions.SubscriptionType;
         Topic = consumerOptions.Topic;
         _receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
         _cts = new CancellationTokenSource();
@@ -411,7 +413,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         var stateManager = CreateStateManager();
         var initialChannel = new NotReadyChannel<TMessage>();
         var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
-        var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_consumerOptions.SubscriptionName, topic, _processManager, initialChannel, 
executor, stateManager, consumerChannelFactory);
+        var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic, 
_processManager, initialChannel, executor, stateManager, 
consumerChannelFactory);
         var process = new ConsumerProcess(correlationId, stateManager, 
subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
         _processManager.Add(process);
         process.Start();
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs 
b/src/DotPulsar/Internal/MessageProcessor.cs
index 510b66f..caf7020 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
 using DotPulsar.Internal.Extensions;
 using Microsoft.Extensions.ObjectPool;
 using System.Collections.Concurrent;
@@ -39,8 +40,16 @@ public sealed class MessageProcessor<TMessage> : IDisposable
     private readonly int _maxMessagesPerTask;
     private readonly TaskScheduler _taskScheduler;
 
-    public MessageProcessor(IConsumer<TMessage> consumer, 
Func<IMessage<TMessage>, CancellationToken, ValueTask> processor, 
ProcessingOptions options)
+    public MessageProcessor(
+        IConsumer<TMessage> consumer,
+        Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+        ProcessingOptions options)
     {
+        if (options.EnsureOrderedAcknowledgment &&
+            (consumer.SubscriptionType == SubscriptionType.Shared ||
+            consumer.SubscriptionType == SubscriptionType.KeyShared))
+            throw new ProcessingException("Ordered acknowledgment can not be 
ensuring with shared subscription types");
+
         const string operation = "process";
         _operationName = $"{consumer.Topic} {operation}";
 
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
index 64cba29..c9eb3e5 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -36,12 +36,14 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
 
     public Uri ServiceUrl { get; }
     public string SubscriptionName { get; }
+    public SubscriptionType SubscriptionType { get; }
     public string Topic { get; }
 
     public SubConsumer(
         Guid correlationId,
         Uri serviceUrl,
         string subscriptionName,
+        SubscriptionType subscriptionType,
         string topic,
         IRegisterEvent eventRegister,
         IConsumerChannel<TMessage> initialChannel,
@@ -52,6 +54,7 @@ public sealed class SubConsumer<TMessage> : 
IConsumer<TMessage>, IContainsChanne
         _correlationId = correlationId;
         ServiceUrl = serviceUrl;
         SubscriptionName = subscriptionName;
+        SubscriptionType = subscriptionType;
         Topic = topic;
         _eventRegister = eventRegister;
         _channel = initialChannel;

Reply via email to