This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5f01ede Fixed multiple issues with the producer hanging after
reconnect
5f01ede is described below
commit 5f01ede84ee5c68f814e7c84d87992268e174be9
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Jan 23 18:04:45 2024 +0100
Fixed multiple issues with the producer hanging after reconnect
---
src/DotPulsar/Internal/Abstractions/IChannel.cs | 1 -
.../Abstractions/IContainsProducerChannel.cs | 20 ---------------
.../Abstractions/IProducerChannelFactory.cs | 2 +-
src/DotPulsar/Internal/Abstractions/Process.cs | 5 ----
src/DotPulsar/Internal/AsyncQueue.cs | 11 +++++---
src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 2 +-
src/DotPulsar/Internal/Awaiter.cs | 6 ++---
src/DotPulsar/Internal/Channel.cs | 3 ---
src/DotPulsar/Internal/ChannelManager.cs | 10 ++++----
.../Internal/Events/ProducerChannelConnected.cs | 30 ----------------------
src/DotPulsar/Internal/ProducerChannelFactory.cs | 8 +++---
src/DotPulsar/Internal/ProducerProcess.cs | 8 +++---
src/DotPulsar/Internal/ProducerResponse.cs | 6 +++--
src/DotPulsar/Internal/SubProducer.cs | 14 +++-------
14 files changed, 34 insertions(+), 92 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index d822f19..f604ea8 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -21,7 +21,6 @@ public interface IChannel
void ClosedByServer();
void WaitingForExclusive();
void Connected();
- void ProducerConnected(ulong topicEpoch);
void Deactivated();
void Disconnected();
void ReachedEndOfTopic();
diff --git a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
deleted file mode 100644
index d75ebb7..0000000
--- a/src/DotPulsar/Internal/Abstractions/IContainsProducerChannel.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions;
-
-public interface IContainsProducerChannel : IContainsChannel
-{
- Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken);
-}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
index efe6af5..3c32674 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannelFactory.cs
@@ -16,5 +16,5 @@ namespace DotPulsar.Internal.Abstractions;
public interface IProducerChannelFactory
{
- Task<IProducerChannel> Create(ulong? topicEpoch, CancellationToken
cancellationToken = default);
+ Task<IProducerChannel> Create(CancellationToken cancellationToken =
default);
}
diff --git a/src/DotPulsar/Internal/Abstractions/Process.cs
b/src/DotPulsar/Internal/Abstractions/Process.cs
index cf05c6b..0f6ed44 100644
--- a/src/DotPulsar/Internal/Abstractions/Process.cs
+++ b/src/DotPulsar/Internal/Abstractions/Process.cs
@@ -35,7 +35,6 @@ public abstract class Process : IProcess
}
public Guid CorrelationId { get; }
- protected ulong? TopicEpoch { get; private set; }
public void Start()
{
@@ -67,10 +66,6 @@ public abstract class Process : IProcess
case ChannelConnected _:
ChannelState = ChannelState.Connected;
break;
- case ProducerChannelConnected producerChannelConnected:
- TopicEpoch = producerChannelConnected.TopicEpoch;
- ChannelState = ChannelState.Connected;
- break;
case ChannelDeactivated _:
ChannelState = ChannelState.Inactive;
break;
diff --git a/src/DotPulsar/Internal/AsyncQueue.cs
b/src/DotPulsar/Internal/AsyncQueue.cs
index 82116de..e0560c0 100644
--- a/src/DotPulsar/Internal/AsyncQueue.cs
+++ b/src/DotPulsar/Internal/AsyncQueue.cs
@@ -72,11 +72,14 @@ public sealed class AsyncQueue<T> : IEnqueue<T>,
IDequeue<T>, IDisposable
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- foreach (var pendingDequeue in _pendingDequeues)
- pendingDequeue.Dispose();
+ lock (_lock)
+ {
+ foreach (var pendingDequeue in _pendingDequeues)
+ pendingDequeue.Dispose();
- _pendingDequeues.Clear();
- _queue.Clear();
+ _pendingDequeues.Clear();
+ _queue.Clear();
+ }
}
private void Cancel(LinkedListNode<CancelableCompletionSource<T>> node)
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index 35f3acd..8476d49 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -279,7 +279,7 @@ public sealed class AsyncQueueWithCursor<T> :
IAsyncDisposable where T : IDispos
{
foreach (var tcs in _queueEmptyTcs)
{
- tcs.SetResult(0);
+ tcs.TrySetResult(0);
}
_queueEmptyTcs.Clear();
diff --git a/src/DotPulsar/Internal/Awaiter.cs
b/src/DotPulsar/Internal/Awaiter.cs
index b434507..50c246d 100644
--- a/src/DotPulsar/Internal/Awaiter.cs
+++ b/src/DotPulsar/Internal/Awaiter.cs
@@ -38,13 +38,13 @@ public sealed class Awaiter<T, TResult> : IDisposable where
T : notnull
public void SetResult(T item, TResult result)
{
if (_items.TryRemove(item, out var tcs))
- tcs.SetResult(result);
+ tcs.TrySetResult(result);
}
public void Cancel(T item)
{
if (_items.TryRemove(item, out var tcs))
- tcs.SetCanceled();
+ tcs.TrySetCanceled();
}
public IEnumerable<T> Keys => _items.Keys;
@@ -52,7 +52,7 @@ public sealed class Awaiter<T, TResult> : IDisposable where T
: notnull
public void Dispose()
{
foreach (var item in _items.Values)
- item.SetCanceled();
+ item.TrySetCanceled();
_items.Clear();
}
diff --git a/src/DotPulsar/Internal/Channel.cs
b/src/DotPulsar/Internal/Channel.cs
index 8679398..60e2ef0 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -59,9 +59,6 @@ public sealed class Channel : IChannel
public void Connected()
=> _eventRegister.Register(new ChannelConnected(_correlationId));
- public void ProducerConnected(ulong topicEpoch)
- => _eventRegister.Register(new
ProducerChannelConnected(_correlationId, topicEpoch));
-
public void Deactivated()
=> _eventRegister.Register(new ChannelDeactivated(_correlationId));
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index 0de84f8..7d4a1e8 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -57,15 +57,15 @@ public sealed class ChannelManager :
IState<ChannelManagerState>, IDisposable
if (response.Result.ProducerSuccess.ProducerReady)
{
-
channel.ProducerConnected(response.Result.ProducerSuccess.TopicEpoch);
+ channel.Connected();
}
else
{
channel.WaitingForExclusive();
- HandleAdditionalProducerSuccess(command,
channel.ProducerConnected);
+ HandleAdditionalProducerSuccess(command, channel.Connected);
}
- return new ProducerResponse(producerId,
response.Result.ProducerSuccess.ProducerName);
+ return new ProducerResponse(producerId,
response.Result.ProducerSuccess.ProducerName,
response.Result.ProducerSuccess.TopicEpoch);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
@@ -257,7 +257,7 @@ public sealed class ChannelManager :
IState<ChannelManagerState>, IDisposable
return channel.SenderLock();
}
- private void HandleAdditionalProducerSuccess(CommandProducer command,
Action<ulong> successAction)
+ private void HandleAdditionalProducerSuccess(CommandProducer command,
Action successAction)
{
_ =
_requestResponseHandler.ExpectAdditionalResponse(command).ContinueWith(response
=>
{
@@ -271,7 +271,7 @@ public sealed class ChannelManager :
IState<ChannelManagerState>, IDisposable
HandleAdditionalProducerSuccess(command, successAction);
return;
}
- successAction.Invoke(response.Result.ProducerSuccess.TopicEpoch);
+ successAction.Invoke();
});
}
diff --git a/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
b/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
deleted file mode 100644
index 469b92c..0000000
--- a/src/DotPulsar/Internal/Events/ProducerChannelConnected.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Events;
-
-using DotPulsar.Internal.Abstractions;
-
-public sealed class ProducerChannelConnected : IEvent
-{
- public ProducerChannelConnected(Guid correlationId, ulong topicEpoch)
- {
- CorrelationId = correlationId;
- TopicEpoch = topicEpoch;
- }
-
- public Guid CorrelationId { get; }
-
- public ulong TopicEpoch { get; }
-}
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 585ba9b..7dab27c 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,6 +26,7 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
private readonly CommandProducer _commandProducer;
private readonly ICompressorFactory? _compressorFactory;
private readonly Schema? _schema;
+ private ulong? _topicEpoch;
public ProducerChannelFactory(
Guid correlationId,
@@ -52,13 +53,13 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
_schema = schemaInfo.PulsarSchema;
}
- public async Task<IProducerChannel> Create(ulong? topicEpoch,
CancellationToken cancellationToken)
+ public async Task<IProducerChannel> Create(CancellationToken
cancellationToken)
{
- if (topicEpoch.HasValue)
+ if (_topicEpoch.HasValue)
{
if (_commandProducer.ProducerAccessMode !=
ProducerAccessMode.Shared)
_commandProducer.ProducerAccessMode =
ProducerAccessMode.Exclusive;
- _commandProducer.TopicEpoch = topicEpoch.Value;
+ _commandProducer.TopicEpoch = _topicEpoch.Value;
}
else
_commandProducer.ResetTopicEpoch();
@@ -66,6 +67,7 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
var connection = await
_connectionPool.FindConnectionForTopic(_commandProducer.Topic,
cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new
AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel,
cancellationToken).ConfigureAwait(false);
+ _topicEpoch = response.TopicEpoch;
var schemaVersion = await GetSchemaVersion(connection,
cancellationToken).ConfigureAwait(false);
return new ProducerChannel(response.ProducerId, response.ProducerName,
connection, _compressorFactory, schemaVersion);
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs
b/src/DotPulsar/Internal/ProducerProcess.cs
index 0996d18..d6f4072 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -20,12 +20,12 @@ using DotPulsar.Internal.Abstractions;
public sealed class ProducerProcess : Process
{
private readonly IStateManager<ProducerState> _stateManager;
- private readonly IContainsProducerChannel _subProducer;
+ private readonly IContainsChannel _subProducer;
public ProducerProcess(
Guid correlationId,
IStateManager<ProducerState> stateManager,
- IContainsProducerChannel producer) : base(correlationId)
+ IContainsChannel producer) : base(correlationId)
{
_stateManager = stateManager;
_subProducer = producer;
@@ -63,10 +63,10 @@ public sealed class ProducerProcess : Process
});
return;
case ChannelState.Connected:
- ActionQueue.Enqueue(async x =>
+ ActionQueue.Enqueue(x =>
{
- await _subProducer.ActivateChannel(TopicEpoch,
x).ConfigureAwait(false);
_stateManager.SetState(ProducerState.Connected);
+ return Task.CompletedTask;
});
return;
case ChannelState.WaitingForExclusive:
diff --git a/src/DotPulsar/Internal/ProducerResponse.cs
b/src/DotPulsar/Internal/ProducerResponse.cs
index 474dc5f..6aa71fb 100644
--- a/src/DotPulsar/Internal/ProducerResponse.cs
+++ b/src/DotPulsar/Internal/ProducerResponse.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -16,12 +16,14 @@ namespace DotPulsar.Internal;
public sealed class ProducerResponse
{
- public ProducerResponse(ulong producerId, string producerName)
+ public ProducerResponse(ulong producerId, string producerName, ulong
topicEpoch)
{
ProducerId = producerId;
ProducerName = producerName;
+ TopicEpoch = topicEpoch;
}
public ulong ProducerId { get; }
public string ProducerName { get; }
+ public ulong TopicEpoch { get; }
}
diff --git a/src/DotPulsar/Internal/SubProducer.cs
b/src/DotPulsar/Internal/SubProducer.cs
index 6924a4e..c778a97 100644
--- a/src/DotPulsar/Internal/SubProducer.cs
+++ b/src/DotPulsar/Internal/SubProducer.cs
@@ -22,7 +22,7 @@ using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
-public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState>
+public sealed class SubProducer : IContainsChannel, IState<ProducerState>
{
private readonly AsyncQueueWithCursor<SendOp> _sendQueue;
private CancellationTokenSource? _dispatcherCts;
@@ -35,7 +35,6 @@ public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState
private readonly IProducerChannelFactory _factory;
private readonly int _partition;
private int _isDisposed;
- private ulong? _topicEpoch;
private Exception? _faultException;
public string Topic { get; }
@@ -151,7 +150,8 @@ public sealed class SubProducer : IContainsProducerChannel,
IState<ProducerState
if (!success)
{
- _eventRegister.Register(new
ChannelDisconnected(_correlationId));
+ if (!cancellationToken.IsCancellationRequested)
+ _eventRegister.Register(new
ChannelDisconnected(_correlationId));
break;
}
}
@@ -229,15 +229,9 @@ public sealed class SubProducer :
IContainsProducerChannel, IState<ProducerState
// Ignored
}
- _channel = await _executor.Execute(() => _factory.Create(_topicEpoch,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- public Task ActivateChannel(ulong? topicEpoch, CancellationToken
cancellationToken)
- {
- _topicEpoch ??= topicEpoch;
+ _channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
_dispatcherCts = new CancellationTokenSource();
_dispatcherTask = Task.Run(async () => await
MessageDispatcher(_channel, _dispatcherCts.Token));
- return Task.CompletedTask;
}
public async ValueTask CloseChannel(CancellationToken cancellationToken)