This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34ed930 Code cleanup. Tracing working.
34ed930 is described below
commit 34ed9301171a42779d2fba6f3f00ddf67a08aad0
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Fri Nov 12 12:30:15 2021 +0100
Code cleanup. Tracing working.
---
CHANGELOG.md | 1 +
src/DotPulsar/DotPulsar.csproj | 6 +++++-
src/DotPulsar/Extensions/ConsumerExtensions.cs | 15 +++++++-------
src/DotPulsar/Internal/ChunkingPipeline.cs | 4 ++--
src/DotPulsar/Internal/Connector.cs | 8 ++++----
src/DotPulsar/Internal/DotPulsarActivitySource.cs | 24 +++++++++-------------
.../Internal/Extensions/ActivityExtensions.cs | 14 +++++--------
src/DotPulsar/Internal/PingPongHandler.cs | 8 ++++----
src/DotPulsar/Internal/Producer.cs | 3 ++-
src/DotPulsar/Internal/PulsarStream.cs | 12 +++++------
src/DotPulsar/Internal/Requests/ConnectRequest.cs | 2 +-
src/DotPulsar/Internal/Requests/SendRequest.cs | 2 +-
src/DotPulsar/Internal/Requests/StandardRequest.cs | 2 +-
tests/DotPulsar.Tests/Internal/Crc32CTests.cs | 6 +++---
14 files changed, 53 insertions(+), 54 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2bc17e2..b48d335 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
## Added
+- .NET 6 added as a target framework
- [Tracing](https://github.com/apache/pulsar-dotpulsar/wiki/Tracing) support
following the
[guidelines](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md)
from the [OpenTelemetry](https://opentelemetry.io/) project
- Sending a message will create a producer trace and add tracing metadata
to the message
- The 'Process' extension method for IConsumer\<TMessage\> is no longer
experimental and will create a consumer trace
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 706c096..bedfefc 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -24,7 +24,7 @@
<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
<PackageReference Include="Microsoft.Extensions.ObjectPool"
Version="6.0.0" />
- <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.0"
PrivateAssets="All" />
+ <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1"
PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.0" />
</ItemGroup>
@@ -43,6 +43,10 @@
<PackageReference Include="System.Diagnostics.DiagnosticSource"
Version="6.0.0" />
</ItemGroup>
+ <ItemGroup Condition="'$(TargetFramework)' == 'net5.0'">
+ <PackageReference Include="System.Diagnostics.DiagnosticSource"
Version="6.0.0" />
+ </ItemGroup>
+
<ItemGroup>
<None Include="PackageIcon.png" Pack="true" PackagePath="/"
Visible="False" />
</ItemGroup>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs
b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index 7d38591..3692c28 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -19,6 +19,7 @@ using DotPulsar.Internal;
using DotPulsar.Internal.Extensions;
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -52,12 +53,12 @@ public static class ConsumerExtensions
var tags = new KeyValuePair<string, object?>[]
{
- new KeyValuePair<string, object?>("messaging.destination",
consumer.Topic),
- new KeyValuePair<string,
object?>("messaging.destination_kind", "topic"),
- new KeyValuePair<string, object?>("messaging.operation",
operation),
- new KeyValuePair<string, object?>("messaging.system",
"pulsar"),
- new KeyValuePair<string, object?>("messaging.url",
consumer.ServiceUrl),
- new KeyValuePair<string,
object?>("messaging.pulsar.subscription", consumer.SubscriptionName)
+ new KeyValuePair<string, object?>("messaging.destination",
consumer.Topic),
+ new KeyValuePair<string, object?>("messaging.destination_kind",
"topic"),
+ new KeyValuePair<string, object?>("messaging.operation",
operation),
+ new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+ new KeyValuePair<string, object?>("messaging.url",
consumer.ServiceUrl),
+ new KeyValuePair<string, object?>("messaging.pulsar.subscription",
consumer.SubscriptionName)
};
while (!cancellationToken.IsCancellationRequested)
@@ -70,7 +71,7 @@ public static class ConsumerExtensions
{
activity.SetMessageId(message.MessageId);
activity.SetPayloadSize(message.Data.Length);
- activity.SetStatusCode("OK");
+ activity.SetStatus(ActivityStatusCode.Ok);
}
try
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs
b/src/DotPulsar/Internal/ChunkingPipeline.cs
index 3528964..7de80b8 100644
--- a/src/DotPulsar/Internal/ChunkingPipeline.cs
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -89,7 +89,7 @@ public sealed class ChunkingPipeline
#if NETSTANDARD2_0
await _stream.WriteAsync(_buffer, 0,
_bufferCount).ConfigureAwait(false);
#else
- await _stream.WriteAsync(_buffer.AsMemory(0,
_bufferCount)).ConfigureAwait(false);
+ await _stream.WriteAsync(_buffer.AsMemory(0,
_bufferCount)).ConfigureAwait(false);
#endif
_bufferCount = 0;
}
@@ -103,7 +103,7 @@ public sealed class ChunkingPipeline
var data = memory.ToArray();
await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
#else
- await _stream.WriteAsync(memory).ConfigureAwait(false);
+ await _stream.WriteAsync(memory).ConfigureAwait(false);
#endif
}
}
diff --git a/src/DotPulsar/Internal/Connector.cs
b/src/DotPulsar/Internal/Connector.cs
index 99f761f..4130fb6 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -100,10 +100,10 @@ public sealed class Connector
else
sslStream.Dispose();
#else
- if (sslStream is null)
- await stream.DisposeAsync().ConfigureAwait(false);
- else
- await sslStream.DisposeAsync().ConfigureAwait(false);
+ if (sslStream is null)
+ await stream.DisposeAsync().ConfigureAwait(false);
+ else
+ await sslStream.DisposeAsync().ConfigureAwait(false);
#endif
throw;
}
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 4f4c163..641d927 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -20,8 +20,7 @@ using System.Diagnostics;
public static class DotPulsarActivitySource
{
- private const string _traceParent = "traceparent";
- private const string _traceState = "tracestate";
+ private const string _conversationId = "conversation_id";
static DotPulsarActivitySource()
{
@@ -35,15 +34,6 @@ public static class DotPulsarActivitySource
if (!ActivitySource.HasListeners())
return null;
- var properties = message.Properties;
-
- if (properties.TryGetValue(_traceParent, out var traceparent))
- {
- var tracestate = properties.ContainsKey(_traceState) ?
properties[_traceState] : null;
- if (ActivityContext.TryParse(traceparent, tracestate, out var
activityContext))
- return ActivitySource.StartActivity(operationName,
ActivityKind.Consumer, activityContext, tags);
- }
-
var activity = ActivitySource.StartActivity(operationName,
ActivityKind.Consumer);
if (activity is not null && activity.IsAllDataRequested)
@@ -53,6 +43,11 @@ public static class DotPulsarActivitySource
var tag = tags[i];
activity.SetTag(tag.Key, tag.Value);
}
+
+
+ var properties = message.Properties;
+ if (properties.TryGetValue(_conversationId, out var
conversationId))
+ activity.SetTag(_conversationId, conversationId);
}
return activity;
@@ -67,14 +62,15 @@ public static class DotPulsarActivitySource
if (activity is not null && activity.IsAllDataRequested)
{
- metadata[_traceParent] = activity.TraceId.ToHexString();
- metadata[_traceState] = activity.TraceStateString;
-
for (var i = 0; i < tags.Length; ++i)
{
var tag = tags[i];
activity.SetTag(tag.Key, tag.Value);
}
+
+ var conversationId = metadata[_conversationId];
+ if (conversationId is not null)
+ activity.SetTag(_conversationId, conversationId);
}
return activity;
diff --git a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
index 11c3149..bb8e16a 100644
--- a/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ActivityExtensions.cs
@@ -25,17 +25,16 @@ public static class ActivityExtensions
private const string _exceptionMessage = "exception.message";
private const string _messageId = "messaging.message_id";
private const string _payloadSize = "messaging.message_payload_size_bytes";
- private const string _statusCode = "otel.status_code";
public static void AddException(this Activity activity, Exception
exception)
{
- activity.SetStatusCode("ERROR");
+ activity.SetStatus(ActivityStatusCode.Error);
var exceptionTags = new ActivityTagsCollection
- {
- { _exceptionType, exception.GetType().FullName },
- { _exceptionStackTrace, exception.ToString() }
- };
+ {
+ { _exceptionType, exception.GetType().FullName },
+ { _exceptionStackTrace, exception.ToString() }
+ };
if (!string.IsNullOrWhiteSpace(exception.Message))
exceptionTags.Add(_exceptionMessage, exception.Message);
@@ -47,9 +46,6 @@ public static class ActivityExtensions
public static void SetMessageId(this Activity activity, MessageId
messageId)
=> activity.SetTag(_messageId, messageId.ToString());
- public static void SetStatusCode(this Activity activity, string statusCode)
- => activity.SetTag(_statusCode, statusCode);
-
public static void SetPayloadSize(this Activity activity, long payloadSize)
=> activity.SetTag(_payloadSize, payloadSize);
}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index 13edaa3..4391095 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -100,9 +100,9 @@ public sealed class PingPongHandler : IAsyncDisposable
return new ValueTask();
}
#else
- public async ValueTask DisposeAsync()
- {
- await _timer.DisposeAsync().ConfigureAwait(false);
- }
+ public async ValueTask DisposeAsync()
+ {
+ await _timer.DisposeAsync().ConfigureAwait(false);
+ }
#endif
}
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index a5808a4..9f4b1e7 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -22,6 +22,7 @@ using DotPulsar.Internal.Extensions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -250,7 +251,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
{
activity.SetMessageId(messageId);
activity.SetPayloadSize(data.Length);
- activity.SetStatusCode("OK");
+ activity.SetStatus(ActivityStatusCode.Ok);
}
return messageId;
diff --git a/src/DotPulsar/Internal/PulsarStream.cs
b/src/DotPulsar/Internal/PulsarStream.cs
index 2bec00b..cbf3104 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -63,11 +63,11 @@ public sealed class PulsarStream : IPulsarStream
return new ValueTask();
}
#else
- public async ValueTask DisposeAsync()
- {
- if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
- await _stream.DisposeAsync().ConfigureAwait(false);
- }
+ public async ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+ await _stream.DisposeAsync().ConfigureAwait(false);
+ }
#endif
private async Task FillPipe(CancellationToken cancellationToken)
@@ -86,7 +86,7 @@ public sealed class PulsarStream : IPulsarStream
var bytesRead = await _stream.ReadAsync(buffer, 0,
buffer.Length, cancellationToken).ConfigureAwait(false);
new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
#else
- var bytesRead = await _stream.ReadAsync(memory,
cancellationToken).ConfigureAwait(false);
+ var bytesRead = await _stream.ReadAsync(memory,
cancellationToken).ConfigureAwait(false);
#endif
if (bytesRead == 0)
break;
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index f4e23a3..091ff6c 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -32,7 +32,7 @@ public struct ConnectRequest : IRequest
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
- public bool Equals([AllowNull] IRequest other)
+ public bool Equals([AllowNull] IRequest other)
#endif
=> other is ConnectRequest;
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs
b/src/DotPulsar/Internal/Requests/SendRequest.cs
index f0ec219..336116d 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -42,7 +42,7 @@ public struct SendRequest : IRequest
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
- public bool Equals([AllowNull] IRequest other)
+ public bool Equals([AllowNull] IRequest other)
#endif
{
if (other is SendRequest request)
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs
b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 755107a..2032a22 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -55,7 +55,7 @@ public struct StandardRequest : IRequest
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
- public bool Equals([AllowNull] IRequest other)
+ public bool Equals([AllowNull] IRequest other)
#endif
{
if (other is StandardRequest request)
diff --git a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
index a426a35..3744121 100644
--- a/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Crc32CTests.cs
@@ -42,9 +42,9 @@ public class Crc32CTests
//Arrange
var s1 = new byte[]
{
- 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f,
- 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18,
- 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01
+ 0x0a, 0x0f, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x61, 0x6c, 0x6f,
+ 0x6e, 0x65, 0x2d, 0x33, 0x30, 0x2d, 0x35, 0x10, 0x00, 0x18,
+ 0xc7, 0xee, 0xa3, 0x93, 0xeb, 0x2c, 0x58, 0x01
};
var s2 = new byte[] { 0x10, 0x01, 0x18, 0xc9, 0xf8, 0x86, 0x94, 0xeb,
0x2c };