This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ab7b003 Centralize object pool for MessageMetadata (#165)
ab7b003 is described below
commit ab7b00315ea04da7175db102d542ee039b6e738a
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jul 7 09:47:42 2023 +0200
Centralize object pool for MessageMetadata (#165)
Ensures cleanup is consistent before returning objects to pool
---
src/DotPulsar/Extensions/SendChannelExtensions.cs | 16 ++-------
src/DotPulsar/Extensions/SendExtensions.cs | 16 ++-------
.../Internal/MessageMetadataObjectPool.cs | 40 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 26 deletions(-)
diff --git a/src/DotPulsar/Extensions/SendChannelExtensions.cs
b/src/DotPulsar/Extensions/SendChannelExtensions.cs
index 046e18d..39b9053 100644
--- a/src/DotPulsar/Extensions/SendChannelExtensions.cs
+++ b/src/DotPulsar/Extensions/SendChannelExtensions.cs
@@ -15,7 +15,7 @@
namespace DotPulsar.Extensions;
using DotPulsar.Abstractions;
-using Microsoft.Extensions.ObjectPool;
+using DotPulsar.Internal;
using System;
using System.Buffers;
using System.Threading;
@@ -26,14 +26,6 @@ using System.Threading.Tasks;
/// </summary>
public static class SendChannelExtensions
{
- private static readonly ObjectPool<MessageMetadata> _messageMetadataPool;
-
- static SendChannelExtensions()
- {
- var messageMetadataPolicy = new
DefaultPooledObjectPolicy<MessageMetadata>();
- _messageMetadataPool = new
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
- }
-
/// <summary>
/// Sends a message.
/// </summary>
@@ -63,13 +55,11 @@ public static class SendChannelExtensions
/// </summary>
public static async ValueTask Send<TMessage>(this ISendChannel<TMessage>
sender, TMessage message, Func<MessageId, ValueTask>? onMessageSent = default,
CancellationToken cancellationToken = default)
{
- var metadata = _messageMetadataPool.Get();
+ var metadata = MessageMetadataObjectPool.Get();
async ValueTask ReleaseMetadataAndCallCallback(MessageId id)
{
- metadata.Metadata.SequenceId = 0;
- metadata.Metadata.Properties.Clear();
- _messageMetadataPool.Return(metadata);
+ MessageMetadataObjectPool.Return(metadata);
if (onMessageSent is not null)
await onMessageSent(id).ConfigureAwait(false);
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs
b/src/DotPulsar/Extensions/SendExtensions.cs
index 055b5fa..fa18466 100644
--- a/src/DotPulsar/Extensions/SendExtensions.cs
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -15,7 +15,7 @@
namespace DotPulsar.Extensions;
using DotPulsar.Abstractions;
-using Microsoft.Extensions.ObjectPool;
+using DotPulsar.Internal;
using System;
using System.Buffers;
using System.Threading;
@@ -26,14 +26,6 @@ using System.Threading.Tasks;
/// </summary>
public static class SendExtensions
{
- private static readonly ObjectPool<MessageMetadata> _messageMetadataPool;
-
- static SendExtensions()
- {
- var messageMetadataPolicy = new
DefaultPooledObjectPolicy<MessageMetadata>();
- _messageMetadataPool = new
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
- }
-
/// <summary>
/// Sends a message.
/// </summary>
@@ -63,7 +55,7 @@ public static class SendExtensions
/// </summary>
public static async ValueTask<MessageId> Send<TMessage>(this
ISend<TMessage> sender, TMessage message, CancellationToken cancellationToken =
default)
{
- var metadata = _messageMetadataPool.Get();
+ var metadata = MessageMetadataObjectPool.Get();
try
{
@@ -71,9 +63,7 @@ public static class SendExtensions
}
finally
{
- metadata.Metadata.SequenceId = 0;
- metadata.Metadata.Properties.Clear();
- _messageMetadataPool.Return(metadata);
+ MessageMetadataObjectPool.Return(metadata);
}
}
}
diff --git a/src/DotPulsar/Internal/MessageMetadataObjectPool.cs
b/src/DotPulsar/Internal/MessageMetadataObjectPool.cs
new file mode 100644
index 0000000..ab9ed5c
--- /dev/null
+++ b/src/DotPulsar/Internal/MessageMetadataObjectPool.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using Microsoft.Extensions.ObjectPool;
+
+public static class MessageMetadataObjectPool
+{
+ private static readonly ObjectPool<MessageMetadata> _messageMetadataPool;
+
+ static MessageMetadataObjectPool()
+ {
+ var messageMetadataPolicy = new
DefaultPooledObjectPolicy<MessageMetadata>();
+ _messageMetadataPool = new
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+ }
+
+ public static MessageMetadata Get()
+ {
+ return _messageMetadataPool.Get();
+ }
+
+ public static void Return(MessageMetadata metadata)
+ {
+ metadata.SequenceId = 0;
+ metadata.Metadata.Properties.Clear();
+ _messageMetadataPool.Return(metadata);
+ }
+}