This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8580679  Implementing (OpenTelemetry) tracing
8580679 is described below

commit 858067959a396e9f38b487d61b63001a932d92bc
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Jul 8 11:09:19 2021 +0200

    Implementing (OpenTelemetry) tracing
---
 CHANGELOG.md                                       | 15 ++++-
 src/DotPulsar/Abstractions/IMessageRouter.cs       |  2 +-
 src/DotPulsar/Abstractions/ISend.cs                |  5 --
 src/DotPulsar/Extensions/ConsumerExtensions.cs     | 55 +++-------------
 .../{Internal => Extensions}/SendExtensions.cs     | 31 ++++++++-
 src/DotPulsar/Internal/DotPulsarActivitySource.cs  | 55 ++++++++++++++++
 .../Internal/Extensions/ActivityExtensions.cs      | 56 ++++++++++++++++
 src/DotPulsar/Internal/Producer.cs                 | 77 ++++++++++++++++------
 src/DotPulsar/Internal/SubProducer.cs              | 56 ++--------------
 src/DotPulsar/RoundRobinPartitionRouter.cs         |  4 +-
 src/DotPulsar/SinglePartitionRouter.cs             |  4 +-
 11 files changed, 229 insertions(+), 131 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e90c41d..2964b04 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,7 +2,20 @@
 
 All notable changes to this project will be documented in this file.
 
-The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+
+## Added
+
+- [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support 
following the 
[guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md)
 from the [OpenTelemetry](https://opentelemetry.io/) project
+    - Sending a message will create a producer trace and add tracing metadata 
to the message
+    - The 'Process' extension method for IConsumer\<TMessage\> is no longer 
experimental and will create a consumer trace
+
+### Changed
+
+- **Breaking**: Sending a message without metadata is now an extension method 
and therefore no longer part of the ISend\<TMessage\> (and thereby 
IProducer\<TMessage\>) interface
+- IMessageRouter: ChoosePartition(MessageMetadata? messageMetadata, int 
numberOfPartitions) -> ChoosePartition(MessageMetadata messageMetadata, int 
numberOfPartitions)
 
 ## [1.1.2] - 2021-07-05
 
diff --git a/src/DotPulsar/Abstractions/IMessageRouter.cs 
b/src/DotPulsar/Abstractions/IMessageRouter.cs
index b6ad294..d0d873c 100644
--- a/src/DotPulsar/Abstractions/IMessageRouter.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -22,6 +22,6 @@ namespace DotPulsar.Abstractions
         /// <summary>
         /// Choose a partition.
         /// </summary>
-        int ChoosePartition(MessageMetadata? messageMetadata, int 
numberOfPartitions);
+        int ChoosePartition(MessageMetadata messageMetadata, int 
numberOfPartitions);
     }
 }
diff --git a/src/DotPulsar/Abstractions/ISend.cs 
b/src/DotPulsar/Abstractions/ISend.cs
index 642f61d..affe029 100644
--- a/src/DotPulsar/Abstractions/ISend.cs
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -23,11 +23,6 @@ namespace DotPulsar.Abstractions
     public interface ISend<TMessage>
     {
         /// <summary>
-        /// Sends a message.
-        /// </summary>
-        ValueTask<MessageId> Send(TMessage message, CancellationToken 
cancellationToken = default);
-
-        /// <summary>
         /// Sends a message with metadata.
         /// </summary>
         ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, 
CancellationToken cancellationToken = default);
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs 
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index f9a78a0..88f12cd 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -16,9 +16,9 @@ namespace DotPulsar.Extensions
 {
     using DotPulsar.Abstractions;
     using DotPulsar.Internal;
+    using DotPulsar.Internal.Extensions;
     using System;
     using System.Collections.Generic;
-    using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -40,7 +40,7 @@ namespace DotPulsar.Extensions
             => await consumer.AcknowledgeCumulative(message.MessageId, 
cancellationToken).ConfigureAwait(false);
 
         /// <summary>
-        /// Process and auto-acknowledge a message. This is experimental.
+        /// Process and auto-acknowledge a message.
         /// </summary>
         public static async ValueTask Process<TMessage>(
             this IConsumer<TMessage> consumer,
@@ -50,7 +50,7 @@ namespace DotPulsar.Extensions
             const string operation = "process";
             var operationName = $"{consumer.Topic} {operation}";
 
-            var tags = new List<KeyValuePair<string, object?>>
+            var tags = new KeyValuePair<string, object?>[]
             {
                 new KeyValuePair<string, object?>("messaging.destination", 
consumer.Topic),
                 new KeyValuePair<string, 
object?>("messaging.destination_kind", "topic"),
@@ -64,13 +64,13 @@ namespace DotPulsar.Extensions
             {
                 var message = await 
consumer.Receive(cancellationToken).ConfigureAwait(false);
 
-                var activity = StartActivity(message, operationName, tags);
+                var activity = 
DotPulsarActivitySource.StartConsumerActivity(message, operationName, tags);
 
                 if (activity is not null && activity.IsAllDataRequested)
                 {
-                    activity.SetTag("messaging.message_id", 
message.MessageId.ToString());
-                    activity.SetTag("messaging.message_payload_size_bytes", 
message.Data.Length);
-                    activity.SetTag("otel.status_code", "OK");
+                    activity.SetMessageId(message.MessageId);
+                    activity.SetPayloadSize(message.Data.Length);
+                    activity.SetStatusCode("OK");
                 }
 
                 try
@@ -80,21 +80,7 @@ namespace DotPulsar.Extensions
                 catch (Exception exception)
                 {
                     if (activity is not null && activity.IsAllDataRequested)
-                    {
-                        activity.SetTag("otel.status_code", "ERROR");
-
-                        var exceptionTags = new ActivityTagsCollection
-                        {
-                            { "exception.type", exception.GetType().FullName },
-                            { "exception.stacktrace", exception.ToString() }
-                        };
-
-                        if (!string.IsNullOrWhiteSpace(exception.Message))
-                            exceptionTags.Add("exception.message", 
exception.Message);
-
-                        var activityEvent = new ActivityEvent("exception", 
default, exceptionTags);
-                        activity.AddEvent(activityEvent);
-                    }
+                        activity.AddException(exception);
                 }
 
                 activity?.Dispose();
@@ -103,31 +89,6 @@ namespace DotPulsar.Extensions
             }
         }
 
-        private static Activity? StartActivity(IMessage message, string 
operationName, IEnumerable<KeyValuePair<string, object?>> tags)
-        {
-            if (!DotPulsarActivitySource.ActivitySource.HasListeners())
-                return null;
-
-            var properties = message.Properties;
-
-            if (properties.TryGetValue("traceparent", out var traceparent))  
// TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
-            {
-                var tracestate = properties.ContainsKey("tracestate") ? 
properties["tracestrate"] : null;
-                if (ActivityContext.TryParse(traceparent, tracestate, out var 
activityContext))
-                    return 
DotPulsarActivitySource.ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer, activityContext, tags);
-            }
-
-            var activity = 
DotPulsarActivitySource.ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer);
-
-            if (activity is not null && activity.IsAllDataRequested)
-            {
-                foreach (var tag in tags)
-                    activity.SetTag(tag.Key, tag.Value);
-            }
-
-            return activity;
-        }
-
         /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
diff --git a/src/DotPulsar/Internal/SendExtensions.cs 
b/src/DotPulsar/Extensions/SendExtensions.cs
similarity index 63%
rename from src/DotPulsar/Internal/SendExtensions.cs
rename to src/DotPulsar/Extensions/SendExtensions.cs
index 535408b..479fe69 100644
--- a/src/DotPulsar/Internal/SendExtensions.cs
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -15,6 +15,7 @@
 namespace DotPulsar.Extensions
 {
     using Abstractions;
+    using Microsoft.Extensions.ObjectPool;
     using System;
     using System.Buffers;
     using System.Threading;
@@ -25,17 +26,25 @@ namespace DotPulsar.Extensions
     /// </summary>
     public static class SendExtensions
     {
+        private static readonly ObjectPool<MessageMetadata> 
_messageMetadataPool;
+
+        static SendExtensions()
+        {
+            var messageMetadataPolicy = new 
DefaultPooledObjectPolicy<MessageMetadata>();
+            _messageMetadataPool = new 
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+        }
+
         /// <summary>
         /// Sends a message.
         /// </summary>
         public static async ValueTask<MessageId> Send(this 
ISend<ReadOnlySequence<byte>> sender, byte[] data, CancellationToken 
cancellationToken = default)
-            => await sender.Send(new ReadOnlySequence<byte>(data), 
cancellationToken).ConfigureAwait(false);
+            => await Send(sender, new ReadOnlySequence<byte>(data), 
cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message.
         /// </summary>
         public static async ValueTask<MessageId> Send(this 
ISend<ReadOnlySequence<byte>> sender, ReadOnlyMemory<byte> data, 
CancellationToken cancellationToken = default)
-            => await sender.Send(new ReadOnlySequence<byte>(data), 
cancellationToken).ConfigureAwait(false);
+            => await Send(sender, new ReadOnlySequence<byte>(data), 
cancellationToken).ConfigureAwait(false);
 
         /// <summary>
         /// Sends a message with metadata.
@@ -48,5 +57,23 @@ namespace DotPulsar.Extensions
         /// </summary>
         public static async ValueTask<MessageId> Send(this 
ISend<ReadOnlySequence<byte>> sender, MessageMetadata metadata, 
ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
             => await sender.Send(metadata, new ReadOnlySequence<byte>(data), 
cancellationToken).ConfigureAwait(false);
+
+        /// <summary>
+        /// Sends a message without metadata.
+        /// </summary>
+        public static async ValueTask<MessageId> Send<TMessage>(this 
ISend<TMessage> sender, TMessage message, CancellationToken cancellationToken = 
default)
+        {
+            var metadata = _messageMetadataPool.Get();
+
+            try
+            {
+                return await sender.Send(metadata, message, 
cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                metadata.Metadata.Properties.Clear();
+                _messageMetadataPool.Return(metadata);
+            }
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs 
b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 09c8464..87e2c3e 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -14,15 +14,70 @@
 
 namespace DotPulsar.Internal
 {
+    using DotPulsar.Abstractions;
+    using System.Collections.Generic;
     using System.Diagnostics;
 
     public static class DotPulsarActivitySource
     {
+        private const string _traceParent = "traceparent";
+        private const string _traceState = "tracestate";
+
         static DotPulsarActivitySource()
         {
             ActivitySource = new ActivitySource(Constants.ClientName, 
Constants.ClientVersion);
         }
 
         public static ActivitySource ActivitySource { get; }
+
+        public static Activity? StartConsumerActivity(IMessage message, string 
operationName, KeyValuePair<string, object?>[] tags)
+        {
+            if (!ActivitySource.HasListeners())
+                return null;
+
+            var properties = message.Properties;
+
+            if (properties.TryGetValue(_traceParent, out var traceparent))
+            {
+                var tracestate = properties.ContainsKey(_traceState) ? 
properties[_traceState] : null;
+                if (ActivityContext.TryParse(traceparent, tracestate, out var 
activityContext))
+                    return ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer, activityContext, tags);
+            }
+
+            var activity = ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                for (var i = 0; i < tags.Length; ++i)
+                {
+                    var tag = tags[i];
+                    activity.SetTag(tag.Key, tag.Value);
+                }
+            }
+
+            return activity;
+        }
+
+        public static Activity? StartProducerActivity(MessageMetadata 
metadata, string operationName, KeyValuePair<string, object?>[] tags)
+        {
+            if (!ActivitySource.HasListeners())
+                return null;
+
+            var activity = ActivitySource.StartActivity(operationName, 
ActivityKind.Producer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                metadata[_traceParent] = activity.TraceId.ToHexString();
+                metadata[_traceState] = activity.TraceStateString;
+
+                for (var i = 0; i < tags.Length; ++i)
+                {
+                    var tag = tags[i];
+                    activity.SetTag(tag.Key, tag.Value);
+                }
+            }
+
+            return activity;
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs 
b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
new file mode 100644
index 0000000..850e93e
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions
+{
+    using System;
+    using System.Diagnostics;
+
+    public static class ActivityExtensions
+    {
+        private const string _exceptionEventName = "exception";
+        private const string _exceptionStackTrace = "exception.stacktrace";
+        private const string _exceptionType = "exception.type";
+        private const string _exceptionMessage = "exception.message";
+        private const string _messageId = "messaging.message_id";
+        private const string _payloadSize = 
"messaging.message_payload_size_bytes";
+        private const string _statusCode = "otel.status_code";
+
+        public static void AddException(this Activity activity, Exception 
exception)
+        {
+            activity.SetStatusCode("ERROR");
+
+            var exceptionTags = new ActivityTagsCollection
+            {
+                { _exceptionType, exception.GetType().FullName },
+                { _exceptionStackTrace, exception.ToString() }
+            };
+
+            if (!string.IsNullOrWhiteSpace(exception.Message))
+                exceptionTags.Add(_exceptionMessage, exception.Message);
+
+            var activityEvent = new ActivityEvent(_exceptionEventName, 
default, exceptionTags);
+            activity.AddEvent(activityEvent);
+        }
+
+        public static void SetMessageId(this Activity activity, MessageId 
messageId)
+            => activity.SetTag(_messageId, messageId.ToString());
+
+        public static void SetStatusCode(this Activity activity, string 
statusCode)
+            => activity.SetTag(_statusCode, statusCode);
+
+        public static void SetPayloadSize(this Activity activity, long 
payloadSize)
+            => activity.SetTag(_payloadSize, payloadSize);
+    }
+}
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index a84d7c9..425d02d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -19,21 +19,24 @@ namespace DotPulsar.Internal
     using DotPulsar.Exceptions;
     using DotPulsar.Extensions;
     using DotPulsar.Internal.Extensions;
-    using DotPulsar.Internal.PulsarApi;
     using System;
     using System.Collections.Concurrent;
+    using System.Collections.Generic;
     using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
     {
+        private readonly string _operationName;
+        private readonly KeyValuePair<string, object?>[] _tags;
+        private readonly SequenceId _sequenceId;
         private readonly StateManager<ProducerState> _state;
         private readonly IConnectionPool _connectionPool;
         private readonly IHandleException _exceptionHandler;
         private readonly ICompressorFactory? _compressorFactory;
         private readonly ProducerOptions<TMessage> _options;
         private readonly ProcessManager _processManager;
-        private readonly ConcurrentDictionary<int, IProducer<TMessage>> 
_producers;
+        private readonly ConcurrentDictionary<int, SubProducer<TMessage>> 
_producers;
         private readonly IMessageRouter _messageRouter;
         private readonly CancellationTokenSource _cts;
         private readonly IExecute _executor;
@@ -52,6 +55,15 @@ namespace DotPulsar.Internal
             IConnectionPool connectionPool,
             ICompressorFactory? compressorFactory)
         {
+            _operationName = $"{options.Topic} send";
+            _tags = new KeyValuePair<string, object?>[]
+            {
+                new KeyValuePair<string, object?>("messaging.destination", 
options.Topic),
+                new KeyValuePair<string, 
object?>("messaging.destination_kind", "topic"),
+                new KeyValuePair<string, object?>("messaging.system", 
"pulsar"),
+                new KeyValuePair<string, object?>("messaging.url", serviceUrl),
+            };
+            _sequenceId = new SequenceId(options.InitialSequenceId);
             _state = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
             ServiceUrl = serviceUrl;
             Topic = options.Topic;
@@ -64,7 +76,7 @@ namespace DotPulsar.Internal
             _messageRouter = options.MessageRouter;
             _cts = new CancellationTokenSource();
             _executor = new Executor(Guid.Empty, this, _exceptionHandler);
-            _producers = new ConcurrentDictionary<int, IProducer<TMessage>>();
+            _producers = new ConcurrentDictionary<int, 
SubProducer<TMessage>>();
             _ = Setup();
         }
 
@@ -149,12 +161,11 @@ namespace DotPulsar.Internal
             var correlationId = Guid.NewGuid();
             var producerName = _options.ProducerName;
             var schema = _options.Schema;
-            var initialSequenceId = _options.InitialSequenceId;
             var factory = new ProducerChannelFactory(correlationId, 
_processManager, _connectionPool, topic, producerName, schema.SchemaInfo, 
_compressorFactory);
             var stateManager = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
             var initialChannel = new NotReadyChannel<TMessage>();
             var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
-            var producer = new SubProducer<TMessage>(correlationId, 
ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, 
executor, stateManager, factory, schema);
+            var producer = new SubProducer<TMessage>(correlationId, 
ServiceUrl, topic, _processManager, initialChannel, executor, stateManager, 
factory, schema);
             var process = new ProducerProcess(correlationId, stateManager, 
producer);
             _processManager.Add(process);
             process.Start();
@@ -164,12 +175,12 @@ namespace DotPulsar.Internal
         private async Task<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken)
         {
             var connection = await 
_connectionPool.FindConnectionForTopic(topic, 
cancellationToken).ConfigureAwait(false);
-            var commandPartitionedMetadata = new 
CommandPartitionedTopicMetadata { Topic = topic };
+            var commandPartitionedMetadata = new 
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
             var response = await connection.Send(commandPartitionedMetadata, 
cancellationToken).ConfigureAwait(false);
 
-            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+            
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
 
-            if (response.PartitionMetadataResponse.Response == 
CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+            if (response.PartitionMetadataResponse.Response == 
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
                 response.PartitionMetadataResponse.Throw();
 
             return response.PartitionMetadataResponse.Partitions;
@@ -203,10 +214,8 @@ namespace DotPulsar.Internal
             }
         }
 
-        private async ValueTask<int> 
ChoosePartitions(DotPulsar.MessageMetadata? metadata, CancellationToken 
cancellationToken)
+        private async ValueTask<int> ChoosePartitions(MessageMetadata 
metadata, CancellationToken cancellationToken)
         {
-            ThrowIfDisposed();
-
             if (_producerCount == 0)
             {
                 _ = await _state.StateChangedFrom(ProducerState.Disconnected, 
cancellationToken).ConfigureAwait(false);
@@ -220,16 +229,46 @@ namespace DotPulsar.Internal
             return _messageRouter.ChoosePartition(metadata, _producerCount);
         }
 
-        public async ValueTask<MessageId> Send(TMessage message, 
CancellationToken cancellationToken)
+        public async ValueTask<MessageId> Send(MessageMetadata metadata, 
TMessage message, CancellationToken cancellationToken)
         {
-            var partition = await ChoosePartitions(null, 
cancellationToken).ConfigureAwait(false);
-            return await _producers[partition].Send(message, 
cancellationToken).ConfigureAwait(false);
-        }
+            ThrowIfDisposed();
 
-        public async ValueTask<MessageId> Send(DotPulsar.MessageMetadata 
metadata, TMessage message, CancellationToken cancellationToken)
-        {
-            var partition = await ChoosePartitions(metadata, 
cancellationToken).ConfigureAwait(false);
-            return await _producers[partition].Send(metadata, message, 
cancellationToken).ConfigureAwait(false);
+            var autoAssignSequenceId = metadata.SequenceId == 0;
+            if (autoAssignSequenceId)
+                metadata.SequenceId = _sequenceId.FetchNext();
+
+            var activity = 
DotPulsarActivitySource.StartProducerActivity(metadata, _operationName, _tags);
+
+            try
+            {
+                var partition = await ChoosePartitions(metadata, 
cancellationToken).ConfigureAwait(false);
+                var producer = _producers[partition];
+                var data = _options.Schema.Encode(message);
+                var messageId = await producer.Send(metadata.Metadata, data, 
cancellationToken).ConfigureAwait(false);
+
+                if (activity is not null && activity.IsAllDataRequested)
+                {
+                    activity.SetMessageId(messageId);
+                    activity.SetPayloadSize(data.Length);
+                    activity.SetStatusCode("OK");
+                }
+
+                return messageId;
+            }
+            catch (Exception exception)
+            {
+                if (activity is not null && activity.IsAllDataRequested)
+                    activity.AddException(exception);
+
+                throw;
+            }
+            finally
+            {
+                activity?.Dispose();
+
+                if (autoAssignSequenceId)
+                    metadata.SequenceId = 0;
+            }
         }
 
         private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 78022d0..95fa217 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -16,10 +16,8 @@ namespace DotPulsar.Internal
 {
     using Abstractions;
     using DotPulsar.Abstractions;
-    using DotPulsar.Exceptions;
     using DotPulsar.Internal.Extensions;
     using Events;
-    using Microsoft.Extensions.ObjectPool;
     using System;
     using System.Buffers;
     using System.Threading;
@@ -27,7 +25,6 @@ namespace DotPulsar.Internal
 
     public sealed class SubProducer<TMessage> : IEstablishNewChannel, 
IProducer<TMessage>
     {
-        private readonly ObjectPool<PulsarApi.MessageMetadata> 
_messageMetadataPool;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private IProducerChannel _channel;
@@ -35,7 +32,6 @@ namespace DotPulsar.Internal
         private readonly IStateChanged<ProducerState> _state;
         private readonly IProducerChannelFactory _factory;
         private readonly ISchema<TMessage> _schema;
-        private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
         public Uri ServiceUrl { get; }
@@ -45,7 +41,6 @@ namespace DotPulsar.Internal
             Guid correlationId,
             Uri serviceUrl,
             string topic,
-            ulong initialSequenceId,
             IRegisterEvent registerEvent,
             IProducerChannel initialChannel,
             IExecute executor,
@@ -53,12 +48,9 @@ namespace DotPulsar.Internal
             IProducerChannelFactory factory,
             ISchema<TMessage> schema)
         {
-            var messageMetadataPolicy = new 
DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
-            _messageMetadataPool = new 
DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
             ServiceUrl = serviceUrl;
             Topic = topic;
-            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
             _channel = initialChannel;
             _executor = executor;
@@ -91,48 +83,14 @@ namespace DotPulsar.Internal
             await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
-        public async ValueTask<MessageId> Send(TMessage message, 
CancellationToken cancellationToken)
-            => await Send(_schema.Encode(message), 
cancellationToken).ConfigureAwait(false);
 
         public async ValueTask<MessageId> Send(MessageMetadata metadata, 
TMessage message, CancellationToken cancellationToken)
-            => await Send(metadata, _schema.Encode(message), 
cancellationToken).ConfigureAwait(false);
+            => await _executor.Execute(() => InternalSend(metadata.Metadata, 
_schema.Encode(message), cancellationToken), 
cancellationToken).ConfigureAwait(false);
 
-        public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, 
CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var metadata = _messageMetadataPool.Get();
-            try
-            {
-                metadata.SequenceId = _sequenceId.FetchNext();
-                return await _executor.Execute(() => Send(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
-            {
-                _messageMetadataPool.Return(metadata);
-            }
-        }
-
-        public async ValueTask<MessageId> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
-
-            var autoAssignSequenceId = metadata.SequenceId == 0;
-            if (autoAssignSequenceId)
-                metadata.SequenceId = _sequenceId.FetchNext();
-
-            try
-            {
-                return await _executor.Execute(() => Send(metadata.Metadata, 
data, cancellationToken), cancellationToken).ConfigureAwait(false);
-            }
-            finally
-            {
-                if (autoAssignSequenceId)
-                    metadata.SequenceId = 0;
-            }
-        }
+        public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+            => await _executor.Execute(() => InternalSend(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
 
-        private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+        private async ValueTask<MessageId> 
InternalSend(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, 
CancellationToken cancellationToken)
         {
             var response = await _channel.Send(metadata, data, 
cancellationToken).ConfigureAwait(false);
             return response.MessageId.ToMessageId();
@@ -148,11 +106,5 @@ namespace DotPulsar.Internal
             if (oldChannel is not null)
                 await oldChannel.DisposeAsync().ConfigureAwait(false);
         }
-
-        private void ThrowIfDisposed()
-        {
-            if (_isDisposed != 0)
-                throw new 
ProducerDisposedException(typeof(Producer<TMessage>).FullName!);
-        }
     }
 }
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs 
b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 0a972bb..6176d40 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -38,9 +38,9 @@ namespace DotPulsar
         /// <summary>
         /// Choose a partition in round robin routing mode
         /// </summary>
-        public int ChoosePartition(MessageMetadata? messageMetadata, int 
numberOfPartitions)
+        public int ChoosePartition(MessageMetadata messageMetadata, int 
numberOfPartitions)
         {
-            var keyBytes = messageMetadata?.KeyBytes;
+            var keyBytes = messageMetadata.KeyBytes;
             if (keyBytes is not null)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % 
numberOfPartitions;
 
diff --git a/src/DotPulsar/SinglePartitionRouter.cs 
b/src/DotPulsar/SinglePartitionRouter.cs
index e9ecfbc..cc2562b 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -46,9 +46,9 @@ namespace DotPulsar
         /// <summary>
         /// Choose a partition in single partition routing mode
         /// </summary>
-        public int ChoosePartition(MessageMetadata? messageMetadata, int 
numberOfPartitions)
+        public int ChoosePartition(MessageMetadata messageMetadata, int 
numberOfPartitions)
         {
-            var keyBytes = messageMetadata?.KeyBytes;
+            var keyBytes = messageMetadata.KeyBytes;
             if (keyBytes is not null)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % 
numberOfPartitions;
             

Reply via email to