This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 34ed930  Code cleanup. Tracing working.
34ed930 is described below

commit 34ed9301171a42779d2fba6f3f00ddf67a08aad0
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Nov 12 12:30:15 2021 +0100

    Code cleanup. Tracing working.
---
 CHANGELOG.md                                       |  1 +
 src/DotPulsar/DotPulsar.csproj                     |  6 +++++-
 src/DotPulsar/Extensions/ConsumerExtensions.cs     | 15 +++++++-------
 src/DotPulsar/Internal/ChunkingPipeline.cs         |  4 ++--
 src/DotPulsar/Internal/Connector.cs                |  8 ++++----
 src/DotPulsar/Internal/DotPulsarActivitySource.cs  | 24 +++++++++-------------
 .../Internal/Extensions/ActivityExtensions.cs      | 14 +++++--------
 src/DotPulsar/Internal/PingPongHandler.cs          |  8 ++++----
 src/DotPulsar/Internal/Producer.cs                 |  3 ++-
 src/DotPulsar/Internal/PulsarStream.cs             | 12 +++++------
 src/DotPulsar/Internal/Requests/ConnectRequest.cs  |  2 +-
 src/DotPulsar/Internal/Requests/SendRequest.cs     |  2 +-
 src/DotPulsar/Internal/Requests/StandardRequest.cs |  2 +-
 tests/DotPulsar.Tests/Internal/Crc32CTests.cs      |  6 +++---
 14 files changed, 53 insertions(+), 54 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2bc17e2..b48d335 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 
 ## Added
 
+- .NET 6 added as a target framework
 - [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support 
following the 
[guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md)
 from the [OpenTelemetry](https://opentelemetry.io/) project
     - Sending a message will create a producer trace and add tracing metadata 
to the message
     - The 'Process' extension method for IConsumer\<TMessage\> is no longer 
experimental and will create a consumer trace
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 706c096..bedfefc 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -24,7 +24,7 @@
   <ItemGroup>    
     <PackageReference Include="HashDepot" Version="2.0.3" />
     <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="6.0.0" />
-    <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.0" 
PrivateAssets="All" />
+    <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" 
PrivateAssets="All" />
     <PackageReference Include="protobuf-net" Version="3.0.101" />
     <PackageReference Include="System.IO.Pipelines" Version="6.0.0" />
   </ItemGroup>
@@ -43,6 +43,10 @@
     <PackageReference Include="System.Diagnostics.DiagnosticSource" 
Version="6.0.0" />
   </ItemGroup>
 
+  <ItemGroup Condition="'$(TargetFramework)' == 'net5.0'">
+       <PackageReference Include="System.Diagnostics.DiagnosticSource" 
Version="6.0.0" />
+  </ItemGroup>
+
   <ItemGroup>
     <None Include="PackageIcon.png" Pack="true" PackagePath="/" 
Visible="False" />
   </ItemGroup>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs 
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 7d38591..3692c28 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -19,6 +19,7 @@ using DotPulsar.Internal;
 using DotPulsar.Internal.Extensions;
 using System;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -52,12 +53,12 @@ public static class ConsumerExtensions
 
         var tags = new KeyValuePair<string, object?>[]
         {
-                new KeyValuePair<string, object?>("messaging.destination", 
consumer.Topic),
-                new KeyValuePair<string, 
object?>("messaging.destination_kind", "topic"),
-                new KeyValuePair<string, object?>("messaging.operation", 
operation),
-                new KeyValuePair<string, object?>("messaging.system", 
"pulsar"),
-                new KeyValuePair<string, object?>("messaging.url", 
consumer.ServiceUrl),
-                new KeyValuePair<string, 
object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+            new KeyValuePair<string, object?>("messaging.destination", 
consumer.Topic),
+            new KeyValuePair<string, object?>("messaging.destination_kind", 
"topic"),
+            new KeyValuePair<string, object?>("messaging.operation", 
operation),
+            new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+            new KeyValuePair<string, object?>("messaging.url", 
consumer.ServiceUrl),
+            new KeyValuePair<string, object?>("messaging.pulsar.subscription", 
consumer.SubscriptionName)
         };
 
         while (!cancellationToken.IsCancellationRequested)
@@ -70,7 +71,7 @@ public static class ConsumerExtensions
             {
                 activity.SetMessageId(message.MessageId);
                 activity.SetPayloadSize(message.Data.Length);
-                activity.SetStatusCode("OK");
+                activity.SetStatus(ActivityStatusCode.Ok);
             }
 
             try
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs 
b/src/DotPulsar/Internal/ChunkingPipeline.cs
index 3528964..7de80b8 100644
--- a/src/DotPulsar/Internal/ChunkingPipeline.cs
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -89,7 +89,7 @@ public sealed class ChunkingPipeline
 #if NETSTANDARD2_0
             await _stream.WriteAsync(_buffer, 0, 
_bufferCount).ConfigureAwait(false);
 #else
-                await _stream.WriteAsync(_buffer.AsMemory(0, 
_bufferCount)).ConfigureAwait(false);
+            await _stream.WriteAsync(_buffer.AsMemory(0, 
_bufferCount)).ConfigureAwait(false);
 #endif
             _bufferCount = 0;
         }
@@ -103,7 +103,7 @@ public sealed class ChunkingPipeline
         var data = memory.ToArray();
         await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
 #else
-            await _stream.WriteAsync(memory).ConfigureAwait(false);
+        await _stream.WriteAsync(memory).ConfigureAwait(false);
 #endif
     }
 }
diff --git a/src/DotPulsar/Internal/Connector.cs 
b/src/DotPulsar/Internal/Connector.cs
index 99f761f..4130fb6 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -100,10 +100,10 @@ public sealed class Connector
             else
                 sslStream.Dispose();
 #else
-                if (sslStream is null)
-                    await stream.DisposeAsync().ConfigureAwait(false);
-                else
-                    await sslStream.DisposeAsync().ConfigureAwait(false);
+            if (sslStream is null)
+                await stream.DisposeAsync().ConfigureAwait(false);
+            else
+                await sslStream.DisposeAsync().ConfigureAwait(false);
 #endif
             throw;
         }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs 
b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 4f4c163..641d927 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -20,8 +20,7 @@ using System.Diagnostics;
 
 public static class DotPulsarActivitySource
 {
-    private const string _traceParent = "traceparent";
-    private const string _traceState = "tracestate";
+    private const string _conversationId = "conversation_id";
 
     static DotPulsarActivitySource()
     {
@@ -35,15 +34,6 @@ public static class DotPulsarActivitySource
         if (!ActivitySource.HasListeners())
             return null;
 
-        var properties = message.Properties;
-
-        if (properties.TryGetValue(_traceParent, out var traceparent))
-        {
-            var tracestate = properties.ContainsKey(_traceState) ? 
properties[_traceState] : null;
-            if (ActivityContext.TryParse(traceparent, tracestate, out var 
activityContext))
-                return ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer, activityContext, tags);
-        }
-
         var activity = ActivitySource.StartActivity(operationName, 
ActivityKind.Consumer);
 
         if (activity is not null && activity.IsAllDataRequested)
@@ -53,6 +43,11 @@ public static class DotPulsarActivitySource
                 var tag = tags[i];
                 activity.SetTag(tag.Key, tag.Value);
             }
+
+
+            var properties = message.Properties;
+            if (properties.TryGetValue(_conversationId, out var 
conversationId))
+                activity.SetTag(_conversationId, conversationId);
         }
 
         return activity;
@@ -67,14 +62,15 @@ public static class DotPulsarActivitySource
 
         if (activity is not null && activity.IsAllDataRequested)
         {
-            metadata[_traceParent] = activity.TraceId.ToHexString();
-            metadata[_traceState] = activity.TraceStateString;
-
             for (var i = 0; i < tags.Length; ++i)
             {
                 var tag = tags[i];
                 activity.SetTag(tag.Key, tag.Value);
             }
+
+            var conversationId = metadata[_conversationId];
+            if (conversationId is not null)
+                activity.SetTag(_conversationId, conversationId);
         }
 
         return activity;
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs 
b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
index 11c3149..bb8e16a 100644
--- a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -25,17 +25,16 @@ public static class ActivityExtensions
     private const string _exceptionMessage = "exception.message";
     private const string _messageId = "messaging.message_id";
     private const string _payloadSize = "messaging.message_payload_size_bytes";
-    private const string _statusCode = "otel.status_code";
 
     public static void AddException(this Activity activity, Exception 
exception)
     {
-        activity.SetStatusCode("ERROR");
+        activity.SetStatus(ActivityStatusCode.Error);
 
         var exceptionTags = new ActivityTagsCollection
-            {
-                { _exceptionType, exception.GetType().FullName },
-                { _exceptionStackTrace, exception.ToString() }
-            };
+        {
+            { _exceptionType, exception.GetType().FullName },
+            { _exceptionStackTrace, exception.ToString() }
+        };
 
         if (!string.IsNullOrWhiteSpace(exception.Message))
             exceptionTags.Add(_exceptionMessage, exception.Message);
@@ -47,9 +46,6 @@ public static class ActivityExtensions
     public static void SetMessageId(this Activity activity, MessageId 
messageId)
         => activity.SetTag(_messageId, messageId.ToString());
 
-    public static void SetStatusCode(this Activity activity, string statusCode)
-        => activity.SetTag(_statusCode, statusCode);
-
     public static void SetPayloadSize(this Activity activity, long payloadSize)
         => activity.SetTag(_payloadSize, payloadSize);
 }
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index 13edaa3..4391095 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -100,9 +100,9 @@ public sealed class PingPongHandler : IAsyncDisposable
         return new ValueTask();
     }
 #else
-        public async ValueTask DisposeAsync()
-        {
-            await _timer.DisposeAsync().ConfigureAwait(false);
-        }
+    public async ValueTask DisposeAsync()
+    {
+        await _timer.DisposeAsync().ConfigureAwait(false);
+    }
 #endif
 }
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index a5808a4..9f4b1e7 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -22,6 +22,7 @@ using DotPulsar.Internal.Extensions;
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -250,7 +251,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             {
                 activity.SetMessageId(messageId);
                 activity.SetPayloadSize(data.Length);
-                activity.SetStatusCode("OK");
+                activity.SetStatus(ActivityStatusCode.Ok);
             }
 
             return messageId;
diff --git a/src/DotPulsar/Internal/PulsarStream.cs 
b/src/DotPulsar/Internal/PulsarStream.cs
index 2bec00b..cbf3104 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -63,11 +63,11 @@ public sealed class PulsarStream : IPulsarStream
         return new ValueTask();
     }
 #else
-        public async ValueTask DisposeAsync()
-        {
-            if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
-                await _stream.DisposeAsync().ConfigureAwait(false);
-        }
+    public async ValueTask DisposeAsync()
+    {
+        if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+            await _stream.DisposeAsync().ConfigureAwait(false);
+    }
 #endif
 
     private async Task FillPipe(CancellationToken cancellationToken)
@@ -86,7 +86,7 @@ public sealed class PulsarStream : IPulsarStream
                 var bytesRead = await _stream.ReadAsync(buffer, 0, 
buffer.Length, cancellationToken).ConfigureAwait(false);
                 new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
 #else
-                    var bytesRead = await _stream.ReadAsync(memory, 
cancellationToken).ConfigureAwait(false);
+                var bytesRead = await _stream.ReadAsync(memory, 
cancellationToken).ConfigureAwait(false);
 #endif
                 if (bytesRead == 0)
                     break;
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs 
b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index f4e23a3..091ff6c 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -32,7 +32,7 @@ public struct ConnectRequest : IRequest
 #if NETSTANDARD2_0
     public bool Equals(IRequest other)
 #else
-        public bool Equals([AllowNull] IRequest other)
+    public bool Equals([AllowNull] IRequest other)
 #endif
             => other is ConnectRequest;
 
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs 
b/src/DotPulsar/Internal/Requests/SendRequest.cs
index f0ec219..336116d 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -42,7 +42,7 @@ public struct SendRequest : IRequest
 #if NETSTANDARD2_0
     public bool Equals(IRequest other)
 #else
-        public bool Equals([AllowNull] IRequest other)
+    public bool Equals([AllowNull] IRequest other)
 #endif
     {
         if (other is SendRequest request)
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs 
b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 755107a..2032a22 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -55,7 +55,7 @@ public struct StandardRequest : IRequest
 #if NETSTANDARD2_0
     public bool Equals(IRequest other)
 #else
-        public bool Equals([AllowNull] IRequest other)
+    public bool Equals([AllowNull] IRequest other)
 #endif
     {
         if (other is StandardRequest request)
diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs 
b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
index a426a35..3744121 100644
--- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -42,9 +42,9 @@ public class Crc32CTests
         //Arrange
         var s1 = new byte[]
         {
-                0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f,
-                0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18,
-                0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01
+            0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f,
+            0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18,
+            0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01
         };
 
         var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb, 
0x2c };

Reply via email to