This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed51103 The `Process` extension method will hang when called with
EnsureOrderedAcknowledgment set to true, a shared subscription and
MaxDegreeOfParallelism above 1. It now throws a `ProcessingException` when
EnsureOrderedAcknowledgment is set to true and with a shared subscription type.
ed51103 is described below
commit ed511035178167c6b8b904130939273cd56587b6
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Jun 17 17:10:53 2024 +0200
The `Process` extension method will hang when called with
EnsureOrderedAcknowledgment set to true, a shared subscription and
MaxDegreeOfParallelism above 1.
It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is
set to true and with a shared subscription type.
---
CHANGELOG.md | 6 ++++--
src/DotPulsar/Abstractions/IConsumer.cs | 5 +++++
src/DotPulsar/Exceptions/ProcessingException.cs | 23 +++++++++++++++++++++++
src/DotPulsar/Internal/Consumer.cs | 4 +++-
src/DotPulsar/Internal/MessageProcessor.cs | 11 ++++++++++-
src/DotPulsar/Internal/SubConsumer.cs | 3 +++
6 files changed, 48 insertions(+), 4 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 957ba44..03fe981 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,8 +8,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
### Fixed
-- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()`
dispose causing an unintended
- `DivideByZeroException`. It now correctly throws a `ProducerClosedException`
+- Fixed race condition in `Producer` between `Send(...)` and `DisposeAsync()`
dispose causing an unintended `DivideByZeroException`.
+ It now throws a `ProducerClosedException`.
+- The `Process` extension method will hang when called with
EnsureOrderedAcknowledgment set to true, a shared subscription and
MaxDegreeOfParallelism above 1.
+ It now throws a `ProcessingException` when EnsureOrderedAcknowledgment is
set to true and with a shared subscription type.
## [3.3.0] - 2024-06-10
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs
b/src/DotPulsar/Abstractions/IConsumer.cs
index a7cf266..55ef779 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -39,6 +39,11 @@ public interface IConsumer : IGetLastMessageIds, ISeek,
IState<ConsumerState>, I
/// </summary>
public string SubscriptionName { get; }
+ /// <summary>
+ /// The consumer's subscription type.
+ /// </summary>
+ public SubscriptionType SubscriptionType { get; }
+
/// <summary>
/// The consumer's topic.
/// </summary>
diff --git a/src/DotPulsar/Exceptions/ProcessingException.cs
b/src/DotPulsar/Exceptions/ProcessingException.cs
new file mode 100644
index 0000000..f8f60fd
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ProcessingException.cs
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// There was an issue while setting up for processing
+/// </summary>
+public sealed class ProcessingException : DotPulsarException
+{
+ public ProcessingException(string message) : base(message) { }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 55da102..18e326d 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -43,6 +43,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
+ public SubscriptionType SubscriptionType { get; }
public string Topic { get; }
public Consumer(
@@ -55,6 +56,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
_state = CreateStateManager();
ServiceUrl = serviceUrl;
SubscriptionName = consumerOptions.SubscriptionName;
+ SubscriptionType = consumerOptions.SubscriptionType;
Topic = consumerOptions.Topic;
_receiveTasks = Array.Empty<Task<IMessage<TMessage>>>();
_cts = new CancellationTokenSource();
@@ -411,7 +413,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
- var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl,
_consumerOptions.SubscriptionName, topic, _processManager, initialChannel,
executor, stateManager, consumerChannelFactory);
+ var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl,
_consumerOptions.SubscriptionName, _consumerOptions.SubscriptionType, topic,
_processManager, initialChannel, executor, stateManager,
consumerChannelFactory);
var process = new ConsumerProcess(correlationId, stateManager,
subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs
b/src/DotPulsar/Internal/MessageProcessor.cs
index 510b66f..caf7020 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
using DotPulsar.Internal.Extensions;
using Microsoft.Extensions.ObjectPool;
using System.Collections.Concurrent;
@@ -39,8 +40,16 @@ public sealed class MessageProcessor<TMessage> : IDisposable
private readonly int _maxMessagesPerTask;
private readonly TaskScheduler _taskScheduler;
- public MessageProcessor(IConsumer<TMessage> consumer,
Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
ProcessingOptions options)
+ public MessageProcessor(
+ IConsumer<TMessage> consumer,
+ Func<IMessage<TMessage>, CancellationToken, ValueTask> processor,
+ ProcessingOptions options)
{
+ if (options.EnsureOrderedAcknowledgment &&
+ (consumer.SubscriptionType == SubscriptionType.Shared ||
+ consumer.SubscriptionType == SubscriptionType.KeyShared))
+ throw new ProcessingException("Ordered acknowledgment can not be
ensuring with shared subscription types");
+
const string operation = "process";
_operationName = $"{consumer.Topic} {operation}";
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
index 64cba29..c9eb3e5 100644
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -36,12 +36,14 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
+ public SubscriptionType SubscriptionType { get; }
public string Topic { get; }
public SubConsumer(
Guid correlationId,
Uri serviceUrl,
string subscriptionName,
+ SubscriptionType subscriptionType,
string topic,
IRegisterEvent eventRegister,
IConsumerChannel<TMessage> initialChannel,
@@ -52,6 +54,7 @@ public sealed class SubConsumer<TMessage> :
IConsumer<TMessage>, IContainsChanne
_correlationId = correlationId;
ServiceUrl = serviceUrl;
SubscriptionName = subscriptionName;
+ SubscriptionType = subscriptionType;
Topic = topic;
_eventRegister = eventRegister;
_channel = initialChannel;