This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git

commit 53c5b1bc17bc868858cdde3b5f337c3eb301831b
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Thu Oct 10 12:52:34 2024 +0200

    Added error details on exceptions from the connector and fixed issue with 
deadlocked DisposeAsync on consumers, readers and producers
---
 CHANGELOG.md                              |  9 +++++--
 benchmarks/Compression/Compression.csproj |  2 +-
 samples/Processing/Processing.csproj      |  2 +-
 src/DotPulsar/DotPulsar.csproj            |  2 +-
 src/DotPulsar/Internal/AsyncLock.cs       | 45 ++++++++++++++++++++-----------
 src/DotPulsar/Internal/AsyncQueue.cs      | 28 ++++++++++++++-----
 src/DotPulsar/Internal/Connector.cs       | 14 ++++++++--
 7 files changed, 74 insertions(+), 28 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a98ab70..328db4d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,11 +9,16 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/)
 ### Added
 
 - Multiple messages can now be acknowledged with 
Acknowledge(IEnumerable\<MessageId> messageIds, CancellationToken 
cancellationToken)
-- ProcessingOptions has a new ShutdownGracePeriod property for doing a 
graceful shutdown by allowing active tasks to finish 
+- ProcessingOptions has a new ShutdownGracePeriod property for doing a 
graceful shutdown by allowing active tasks to finish
 
 ### Changed
 
-- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 
8.0.8
+- Updated the Microsoft.Extensions.ObjectPool dependency from version 8.0.7 to 
8.0.10
+- 'SslPolicyErrors' are added to the 'Data' property of the exception thrown 
when failing to connect
+
+- ### Fixed
+
+- When disposing producers, consumers, or readers 'DisposeAsync' would 
sometimes hang
 
 ## [3.3.2] - 2024-08-07
 
diff --git a/benchmarks/Compression/Compression.csproj 
b/benchmarks/Compression/Compression.csproj
index 41f8afb..36a5f6b 100644
--- a/benchmarks/Compression/Compression.csproj
+++ b/benchmarks/Compression/Compression.csproj
@@ -11,7 +11,7 @@
     <PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
     <PackageReference Include="DotNetZip" Version="1.16.0" />
     <PackageReference Include="Google.Protobuf" Version="3.28.2" />
-    <PackageReference Include="Grpc.Tools" Version="2.66.0">
+    <PackageReference Include="Grpc.Tools" Version="2.67.0">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; 
buildtransitive</IncludeAssets>
     </PackageReference>
diff --git a/samples/Processing/Processing.csproj 
b/samples/Processing/Processing.csproj
index fbb25bd..f2d243d 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -8,7 +8,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
+    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index ab79788..e072ab2 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -24,7 +24,7 @@
   
   <ItemGroup>
     <PackageReference Include="HashDepot" Version="2.0.3" />
-    <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="8.0.8" />
+    <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="8.0.10" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" 
PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; 
analyzers; buildtransitive" />
     <PackageReference Include="protobuf-net" Version="3.2.30" />
     <PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
diff --git a/src/DotPulsar/Internal/AsyncLock.cs 
b/src/DotPulsar/Internal/AsyncLock.cs
index 2a488ac..1134d7f 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -58,17 +58,22 @@ public sealed class AsyncLock : IAsyncDisposable
 
     public async ValueTask DisposeAsync()
     {
-        lock (_pending)
-        {
-            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
-                return;
+        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+            return;
 
-            foreach (var pending in _pending)
-                pending.Dispose();
+        IEnumerable<CancelableCompletionSource<IDisposable>> pending;
 
+        lock (_pending)
+        {
+            pending = _pending.ToArray();
             _pending.Clear();
         }
 
+        foreach (var ccs in pending)
+        {
+            ccs.Dispose();
+        }
+
         await _semaphoreSlim.WaitAsync().ConfigureAwait(false); //Wait for 
possible lock-holder to finish
 
         _semaphoreSlim.Release();
@@ -82,31 +87,41 @@ public sealed class AsyncLock : IAsyncDisposable
             try
             {
                 _pending.Remove(node);
-                node.Value.Dispose();
             }
             catch
             {
                 // Ignore
             }
         }
+
+        try
+        {
+            node.Value.Dispose();
+        }
+        catch
+        {
+            // Ignore
+        }
     }
 
     private void Release()
     {
+        LinkedListNode<CancelableCompletionSource<IDisposable>>? node;
+
         lock (_pending)
         {
-            var node = _pending.First;
+            node = _pending.First;
             if (node is not null)
-            {
-                node.Value.SetResult(_releaser);
-                node.Value.Dispose();
                 _pending.RemoveFirst();
-                return;
-            }
-
-            if (_semaphoreSlim.CurrentCount == 0)
+            else if (_semaphoreSlim.CurrentCount == 0)
                 _semaphoreSlim.Release();
         }
+
+        if (node is not null)
+        {
+            node.Value.SetResult(_releaser);
+            node.Value.Dispose();
+        }
     }
 
     private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs 
b/src/DotPulsar/Internal/AsyncQueue.cs
index e0560c0..0675542 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -33,20 +33,23 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, 
IDequeue<T>, IDisposable
 
     public void Enqueue(T item)
     {
+        LinkedListNode<CancelableCompletionSource<T>>? node;
+
         lock (_lock)
         {
             ThrowIfDisposed();
 
-            var node = _pendingDequeues.First;
+            node = _pendingDequeues.First;
             if (node is not null)
             {
                 node.Value.SetResult(item);
-                node.Value.Dispose();
                 _pendingDequeues.RemoveFirst();
             }
             else
                 _queue.Enqueue(item);
         }
+
+        node?.Value.Dispose();
     }
 
     public ValueTask<T> Dequeue(CancellationToken cancellationToken = default)
@@ -72,14 +75,19 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, 
IDequeue<T>, IDisposable
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
+        IEnumerable<CancelableCompletionSource<T>> pendingDequeues;
+
         lock (_lock)
         {
-            foreach (var pendingDequeue in _pendingDequeues)
-                pendingDequeue.Dispose();
-
+            pendingDequeues = _pendingDequeues.ToArray();
             _pendingDequeues.Clear();
             _queue.Clear();
         }
+
+        foreach (var ccs in pendingDequeues)
+        {
+            ccs.Dispose();
+        }
     }
 
     private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
@@ -88,7 +96,6 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, IDequeue<T>, 
IDisposable
         {
             try
             {
-                node.Value.Dispose();
                 _pendingDequeues.Remove(node);
             }
             catch
@@ -96,6 +103,15 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, 
IDequeue<T>, IDisposable
                 // ignored
             }
         }
+
+        try
+        {
+            node.Value.Dispose();
+        }
+        catch
+        {
+            // ignored
+        }
     }
 
     private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/Connector.cs 
b/src/DotPulsar/Internal/Connector.cs
index 6c8964b..f68aeb8 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -118,10 +118,17 @@ public sealed class Connector
     private async Task<Stream> EncryptStream(Stream stream, string host, 
CancellationToken cancellationToken)
     {
         SslStream? sslStream = null;
+        var policyErrors = SslPolicyErrors.None;
+
+        bool Validate(object sender, X509Certificate? certificate, X509Chain? 
chain, SslPolicyErrors sslPolicyErrors)
+        {
+            policyErrors = sslPolicyErrors;
+            return ValidateServerCertificate(sender, certificate, chain, 
sslPolicyErrors);
+        }
 
         try
         {
-            sslStream = new SslStream(stream, false, 
ValidateServerCertificate, null);
+            sslStream = new SslStream(stream, false, Validate, null);
             var options = new SslClientAuthenticationOptions
             {
                 TargetHost = host,
@@ -132,13 +139,16 @@ public sealed class Connector
             await sslStream.AuthenticateAsClientAsync(options, 
cancellationToken).ConfigureAwait(false);
             return sslStream;
         }
-        catch
+        catch (Exception exception)
         {
             if (sslStream is null)
                 await stream.DisposeAsync().ConfigureAwait(false);
             else
                 await sslStream.DisposeAsync().ConfigureAwait(false);
 
+            if (policyErrors != SslPolicyErrors.None)
+                exception.Data.Add("SslPolicyErrors", policyErrors);
+
             throw;
         }
     }

Reply via email to