This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee57499  Fixed connection issues and make ready for release 2.5.2
ee57499 is described below

commit ee5749936a17ba43ca44e573a7effeeb23a5b80f
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Nov 2 13:23:52 2022 +0100

    Fixed connection issues and make ready for release 2.5.2
---
 CHANGELOG.md                                      | 12 ++++++++++++
 src/DotPulsar/DotPulsar.csproj                    |  2 +-
 src/DotPulsar/Internal/Consumer.cs                |  4 ++--
 src/DotPulsar/Internal/ConsumerProcess.cs         | 10 +++++++++-
 src/DotPulsar/Internal/DefaultExceptionHandler.cs |  2 ++
 src/DotPulsar/Internal/ProducerProcess.cs         | 10 +++++++++-
 src/DotPulsar/Internal/Reader.cs                  |  4 ++--
 src/DotPulsar/Internal/ReaderProcess.cs           | 10 +++++++++-
 src/DotPulsar/Internal/SubProducer.cs             |  4 ++--
 9 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fdbb5f5..fddc921 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,18 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [2.5.2] - 2022-11-02
+
+### Changed
+
+- A temporarily broken connection could cause DotPulsar to mark consumers, 
readers, and producers as disconnected while the broker was unaware of the 
problem. When reconnecting the broker would claim that the consumer, reader or 
producer is still active. Therefore the default exception handling will now 
retry (instead of rethrow/fault) on these two exceptions (waiting for the 
broker to realize that the existing connections are dead)
+    - ConsumerBusyException
+    - ProducerBusyException
+
+### Fixed
+
+- Under certain circumstances a double connection attempt could be initiated, 
resulting in one good connection and a looping connection calling the exception 
pipeline
+
 ## [2.5.1] - 2022-11-01
 
 ### Fixed
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index c8ef371..3cf32fd 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0;net6.0</TargetFrameworks>
-    <Version>2.5.1</Version>
+    <Version>2.5.2</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
     <Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index a81b3ee..f3d1815 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -197,9 +197,9 @@ public sealed class Consumer<TMessage> : 
IEstablishNewChannel, IConsumer<TMessag
         var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
         var oldChannel = _channel;
-        _channel = channel;
-
         if (oldChannel is not null)
             await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+        _channel = channel;
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index af2ef22..059822f 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -23,6 +23,7 @@ public sealed class ConsumerProcess : Process
     private readonly IStateManager<ConsumerState> _stateManager;
     private readonly IEstablishNewChannel _consumer;
     private readonly bool _isFailoverSubscription;
+    private Task? _establishNewChannelTask;
 
     public ConsumerProcess(
         Guid correlationId,
@@ -64,7 +65,7 @@ public sealed class ConsumerProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ConsumerState.Disconnected);
-                _ = 
_consumer.EstablishNewChannel(CancellationTokenSource.Token);
+                EstablishNewChannel();
                 return;
             case ChannelState.Connected:
                 if (!_isFailoverSubscription)
@@ -78,4 +79,11 @@ public sealed class ConsumerProcess : Process
                 return;
         }
     }
+
+    private void EstablishNewChannel()
+    {
+        var token = CancellationTokenSource.Token;
+        if (_establishNewChannelTask is null || 
_establishNewChannelTask.IsCompleted)
+            _establishNewChannelTask = Task.Run(() => 
_consumer.EstablishNewChannel(token).ConfigureAwait(false), token);
+    }
 }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index a42eeef..dca9913 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -48,6 +48,8 @@ public sealed class DefaultExceptionHandler : IHandleException
             ServiceNotReadyException _ => FaultAction.Retry,
             MetadataException _ => FaultAction.Rethrow,
             ConsumerNotFoundException _ => FaultAction.Retry,
+            ConsumerBusyException _ => FaultAction.Retry,
+            ProducerBusyException _ => FaultAction.Retry,
             ConnectionDisposedException _ => FaultAction.Retry,
             AsyncLockDisposedException _ => FaultAction.Retry,
             PulsarStreamDisposedException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index 9a39bdc..2986d97 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -22,6 +22,7 @@ public sealed class ProducerProcess : Process
 {
     private readonly IStateManager<ProducerState> _stateManager;
     private readonly IEstablishNewChannel _producer;
+    private Task? _establishNewChannelTask;
 
     public ProducerProcess(
         Guid correlationId,
@@ -55,11 +56,18 @@ public sealed class ProducerProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ProducerState.Disconnected);
-                _ = 
_producer.EstablishNewChannel(CancellationTokenSource.Token);
+                EstablishNewChannel();
                 return;
             case ChannelState.Connected:
                 _stateManager.SetState(ProducerState.Connected);
                 return;
         }
     }
+
+    private void EstablishNewChannel()
+    {
+        var token = CancellationTokenSource.Token;
+        if (_establishNewChannelTask is null || 
_establishNewChannelTask.IsCompleted)
+            _establishNewChannelTask = Task.Run(() => 
_producer.EstablishNewChannel(token).ConfigureAwait(false), token);
+    }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 2bd3f73..6f9a959 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -126,10 +126,10 @@ public sealed class Reader<TMessage> : 
IEstablishNewChannel, IReader<TMessage>
         var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
         var oldChannel = _channel;
-        _channel = channel;
-
         if (oldChannel is not null)
             await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+        _channel = channel;
     }
 
     private void ThrowIfDisposed()
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index 6f4aacf..3c66c48 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -22,6 +22,7 @@ public sealed class ReaderProcess : Process
 {
     private readonly IStateManager<ReaderState> _stateManager;
     private readonly IEstablishNewChannel _reader;
+    private Task? _establishNewChannelTask;
 
     public ReaderProcess(
         Guid correlationId,
@@ -55,7 +56,7 @@ public sealed class ReaderProcess : Process
             case ChannelState.ClosedByServer:
             case ChannelState.Disconnected:
                 _stateManager.SetState(ReaderState.Disconnected);
-                _ = _reader.EstablishNewChannel(CancellationTokenSource.Token);
+                EstablishNewChannel();
                 return;
             case ChannelState.Connected:
                 _stateManager.SetState(ReaderState.Connected);
@@ -65,4 +66,11 @@ public sealed class ReaderProcess : Process
                 return;
         }
     }
+
+    private void EstablishNewChannel()
+    {
+        var token = CancellationTokenSource.Token;
+        if (_establishNewChannelTask is null || 
_establishNewChannelTask.IsCompleted)
+            _establishNewChannelTask = Task.Run(() => 
_reader.EstablishNewChannel(token).ConfigureAwait(false), token);
+    }
 }
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index a09b3e2..592ea5e 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -101,9 +101,9 @@ public sealed class SubProducer<TMessage> : 
IEstablishNewChannel, IProducer<TMes
         var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
         var oldChannel = _channel;
-        _channel = channel;
-
         if (oldChannel is not null)
             await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+        _channel = channel;
     }
 }

Reply via email to