This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f01ede  Fixed multiple issues with the producer hanging after 
reconnect
5f01ede is described below

commit 5f01ede84ee5c68f814e7c84d87992268e174be9
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Jan 23 18:04:45 2024 +0100

    Fixed multiple issues with the producer hanging after reconnect
---
 src/DotPulsar/Internal/Abstractions/IChannel.cs    |  1 -
 .../Abstractions/IContainsProducerChannel.cs       | 20 ---------------
 .../Abstractions/IProducerChannelFactory.cs        |  2 +-
 src/DotPulsar/Internal/Abstractions/Process.cs     |  5 ----
 src/DotPulsar/Internal/AsyncQueue.cs               | 11 +++++---
 src/DotPulsar/Internal/AsyncQueueWithCursor.cs     |  2 +-
 src/DotPulsar/Internal/Awaiter.cs                  |  6 ++---
 src/DotPulsar/Internal/Channel.cs                  |  3 ---
 src/DotPulsar/Internal/ChannelManager.cs           | 10 ++++----
 .../Internal/Events/ProducerChannelConnected.cs    | 30 ----------------------
 src/DotPulsar/Internal/ProducerChannelFactory.cs   |  8 +++---
 src/DotPulsar/Internal/ProducerProcess.cs          |  8 +++---
 src/DotPulsar/Internal/ProducerResponse.cs         |  6 +++--
 src/DotPulsar/Internal/SubProducer.cs              | 14 +++-------
 14 files changed, 34 insertions(+), 92 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index d822f19..f604ea8 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -21,7 +21,6 @@ public interface IChannel
     void ClosedByServer();
     void WaitingForExclusive();
     void Connected();
-    void ProducerConnected(ulong topicEpoch);
     void Deactivated();
     void Disconnected();
     void ReachedEndOfTopic();
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
deleted file mode 100644
index d75ebb7..0000000
--- a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions;
-
-public interface IContainsProducerChannel : IContainsChannel
-{
-    Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken);
-}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
index efe6af5..3c32674 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
@@ -16,5 +16,5 @@ namespace DotPulsar.Internal.Abstractions;
 
 public interface IProducerChannelFactory
 {
-    Task<IProducerChannel> Create(ulong? topicEpoch, CancellationToken 
cancellationToken = default);
+    Task<IProducerChannel> Create(CancellationToken cancellationToken = 
default);
 }
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs 
b/src/DotPulsar/Internal/Abstractions/Process.cs
index cf05c6b..0f6ed44 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -35,7 +35,6 @@ public abstract class Process : IProcess
     }
 
     public Guid CorrelationId { get; }
-    protected ulong? TopicEpoch { get; private set; }
 
     public void Start()
     {
@@ -67,10 +66,6 @@ public abstract class Process : IProcess
             case ChannelConnected _:
                 ChannelState = ChannelState.Connected;
                 break;
-            case ProducerChannelConnected producerChannelConnected:
-                TopicEpoch = producerChannelConnected.TopicEpoch;
-                ChannelState = ChannelState.Connected;
-                break;
             case ChannelDeactivated _:
                 ChannelState = ChannelState.Inactive;
                 break;
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs 
b/src/DotPulsar/Internal/AsyncQueue.cs
index 82116de..e0560c0 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -72,11 +72,14 @@ public sealed class AsyncQueue<T> : IEnqueue<T>, 
IDequeue<T>, IDisposable
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        foreach (var pendingDequeue in _pendingDequeues)
-            pendingDequeue.Dispose();
+        lock (_lock)
+        {
+            foreach (var pendingDequeue in _pendingDequeues)
+                pendingDequeue.Dispose();
 
-        _pendingDequeues.Clear();
-        _queue.Clear();
+            _pendingDequeues.Clear();
+            _queue.Clear();
+        }
     }
 
     private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs 
b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index 35f3acd..8476d49 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -279,7 +279,7 @@ public sealed class AsyncQueueWithCursor<T> : 
IAsyncDisposable where T : IDispos
     {
         foreach (var tcs in _queueEmptyTcs)
         {
-            tcs.SetResult(0);
+            tcs.TrySetResult(0);
         }
 
         _queueEmptyTcs.Clear();
diff --git a/src/DotPulsar/Internal/Awaiter.cs 
b/src/DotPulsar/Internal/Awaiter.cs
index b434507..50c246d 100644
--- a/src/DotPulsar/Internal/Awaiter.cs
+++ b/src/DotPulsar/Internal/Awaiter.cs
@@ -38,13 +38,13 @@ public sealed class Awaiter<T, TResult> : IDisposable where 
T : notnull
     public void SetResult(T item, TResult result)
     {
         if (_items.TryRemove(item, out var tcs))
-            tcs.SetResult(result);
+            tcs.TrySetResult(result);
     }
 
     public void Cancel(T item)
     {
         if (_items.TryRemove(item, out var tcs))
-            tcs.SetCanceled();
+            tcs.TrySetCanceled();
     }
 
     public IEnumerable<T> Keys => _items.Keys;
@@ -52,7 +52,7 @@ public sealed class Awaiter<T, TResult> : IDisposable where T 
: notnull
     public void Dispose()
     {
         foreach (var item in _items.Values)
-            item.SetCanceled();
+            item.TrySetCanceled();
 
         _items.Clear();
     }
diff --git a/src/DotPulsar/Internal/Channel.cs 
b/src/DotPulsar/Internal/Channel.cs
index 8679398..60e2ef0 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -59,9 +59,6 @@ public sealed class Channel : IChannel
     public void Connected()
         => _eventRegister.Register(new ChannelConnected(_correlationId));
 
-    public void ProducerConnected(ulong topicEpoch)
-        => _eventRegister.Register(new 
ProducerChannelConnected(_correlationId, topicEpoch));
-
     public void Deactivated()
         => _eventRegister.Register(new ChannelDeactivated(_correlationId));
 
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index 0de84f8..7d4a1e8 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -57,15 +57,15 @@ public sealed class ChannelManager : 
IState<ChannelManagerState>, IDisposable
 
             if (response.Result.ProducerSuccess.ProducerReady)
             {
-                
channel.ProducerConnected(response.Result.ProducerSuccess.TopicEpoch);
+                channel.Connected();
             }
             else
             {
                 channel.WaitingForExclusive();
-                HandleAdditionalProducerSuccess(command, 
channel.ProducerConnected);
+                HandleAdditionalProducerSuccess(command, channel.Connected);
             }
 
-            return new ProducerResponse(producerId, 
response.Result.ProducerSuccess.ProducerName);
+            return new ProducerResponse(producerId, 
response.Result.ProducerSuccess.ProducerName, 
response.Result.ProducerSuccess.TopicEpoch);
         }, TaskContinuationOptions.OnlyOnRanToCompletion);
     }
 
@@ -257,7 +257,7 @@ public sealed class ChannelManager : 
IState<ChannelManagerState>, IDisposable
         return channel.SenderLock();
     }
 
-    private void HandleAdditionalProducerSuccess(CommandProducer command, 
Action<ulong> successAction)
+    private void HandleAdditionalProducerSuccess(CommandProducer command, 
Action successAction)
     {
         _ = 
_requestResponseHandler.ExpectAdditionalResponse(command).ContinueWith(response 
=>
         {
@@ -271,7 +271,7 @@ public sealed class ChannelManager : 
IState<ChannelManagerState>, IDisposable
                 HandleAdditionalProducerSuccess(command, successAction);
                 return;
             }
-            successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
+            successAction.Invoke();
         });
     }
 
diff --git a/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs 
b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
deleted file mode 100644
index 469b92c..0000000
--- a/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events;
-
-using DotPulsar.Internal.Abstractions;
-
-public sealed class ProducerChannelConnected : IEvent
-{
-    public ProducerChannelConnected(Guid correlationId, ulong topicEpoch)
-    {
-        CorrelationId = correlationId;
-        TopicEpoch = topicEpoch;
-    }
-
-    public Guid CorrelationId { get; }
-
-    public ulong TopicEpoch { get; }
-}
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 585ba9b..7dab27c 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,6 +26,7 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
     private readonly CommandProducer _commandProducer;
     private readonly ICompressorFactory? _compressorFactory;
     private readonly Schema? _schema;
+    private ulong? _topicEpoch;
 
     public ProducerChannelFactory(
         Guid correlationId,
@@ -52,13 +53,13 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
         _schema = schemaInfo.PulsarSchema;
     }
 
-    public async Task<IProducerChannel> Create(ulong? topicEpoch, 
CancellationToken cancellationToken)
+    public async Task<IProducerChannel> Create(CancellationToken 
cancellationToken)
     {
-        if (topicEpoch.HasValue)
+        if (_topicEpoch.HasValue)
         {
             if (_commandProducer.ProducerAccessMode != 
ProducerAccessMode.Shared)
                 _commandProducer.ProducerAccessMode = 
ProducerAccessMode.Exclusive;
-            _commandProducer.TopicEpoch = topicEpoch.Value;
+            _commandProducer.TopicEpoch = _topicEpoch.Value;
         }
         else
             _commandProducer.ResetTopicEpoch();
@@ -66,6 +67,7 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
         var connection = await 
_connectionPool.FindConnectionForTopic(_commandProducer.Topic, 
cancellationToken).ConfigureAwait(false);
         var channel = new Channel(_correlationId, _eventRegister, new 
AsyncQueue<MessagePackage>());
         var response = await connection.Send(_commandProducer, channel, 
cancellationToken).ConfigureAwait(false);
+        _topicEpoch = response.TopicEpoch;
         var schemaVersion = await GetSchemaVersion(connection, 
cancellationToken).ConfigureAwait(false);
         return new ProducerChannel(response.ProducerId, response.ProducerName, 
connection, _compressorFactory, schemaVersion);
     }
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index 0996d18..d6f4072 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -20,12 +20,12 @@ using DotPulsar.Internal.Abstractions;
 public sealed class ProducerProcess : Process
 {
     private readonly IStateManager<ProducerState> _stateManager;
-    private readonly IContainsProducerChannel _subProducer;
+    private readonly IContainsChannel _subProducer;
 
     public ProducerProcess(
         Guid correlationId,
         IStateManager<ProducerState> stateManager,
-        IContainsProducerChannel producer) : base(correlationId)
+        IContainsChannel producer) : base(correlationId)
     {
         _stateManager = stateManager;
         _subProducer = producer;
@@ -63,10 +63,10 @@ public sealed class ProducerProcess : Process
                 });
                 return;
             case ChannelState.Connected:
-                ActionQueue.Enqueue(async x =>
+                ActionQueue.Enqueue(x =>
                 {
-                    await _subProducer.ActivateChannel(TopicEpoch, 
x).ConfigureAwait(false);
                     _stateManager.SetState(ProducerState.Connected);
+                    return Task.CompletedTask;
                 });
                 return;
             case ChannelState.WaitingForExclusive:
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs 
b/src/DotPulsar/Internal/ProducerResponse.cs
index 474dc5f..6aa71fb 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -16,12 +16,14 @@ namespace DotPulsar.Internal;
 
 public sealed class ProducerResponse
 {
-    public ProducerResponse(ulong producerId, string producerName)
+    public ProducerResponse(ulong producerId, string producerName, ulong 
topicEpoch)
     {
         ProducerId = producerId;
         ProducerName = producerName;
+        TopicEpoch = topicEpoch;
     }
 
     public ulong ProducerId { get; }
     public string ProducerName { get; }
+    public ulong TopicEpoch { get; }
 }
diff --git a/src/DotPulsar/Internal/SubProducer.cs 
b/src/DotPulsar/Internal/SubProducer.cs
index 6924a4e..c778a97 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -22,7 +22,7 @@ using DotPulsar.Internal.Exceptions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 
-public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState>
+public sealed class SubProducer : IContainsChannel, IState<ProducerState>
 {
     private readonly AsyncQueueWithCursor<SendOp> _sendQueue;
     private CancellationTokenSource? _dispatcherCts;
@@ -35,7 +35,6 @@ public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState
     private readonly IProducerChannelFactory _factory;
     private readonly int _partition;
     private int _isDisposed;
-    private ulong? _topicEpoch;
     private Exception? _faultException;
 
     public string Topic { get; }
@@ -151,7 +150,8 @@ public sealed class SubProducer : IContainsProducerChannel, 
IState<ProducerState
 
             if (!success)
             {
-                _eventRegister.Register(new 
ChannelDisconnected(_correlationId));
+                if (!cancellationToken.IsCancellationRequested)
+                    _eventRegister.Register(new 
ChannelDisconnected(_correlationId));
                 break;
             }
         }
@@ -229,15 +229,9 @@ public sealed class SubProducer : 
IContainsProducerChannel, IState<ProducerState
             // Ignored
         }
 
-        _channel = await _executor.Execute(() => _factory.Create(_topicEpoch, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-    }
-
-    public Task ActivateChannel(ulong? topicEpoch, CancellationToken 
cancellationToken)
-    {
-        _topicEpoch ??= topicEpoch;
+        _channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
         _dispatcherCts = new CancellationTokenSource();
         _dispatcherTask = Task.Run(async () => await 
MessageDispatcher(_channel, _dispatcherCts.Token));
-        return Task.CompletedTask;
     }
 
     public async ValueTask CloseChannel(CancellationToken cancellationToken)

Reply via email to