blankensteiner commented on code in PR #104:
URL: https://github.com/apache/pulsar-dotpulsar/pull/104#discussion_r879466029
##########
tests/DotPulsar.Tests/ProducerTests.cs:
##########
@@ -152,6 +152,7 @@ private IPulsarClient CreateClient()
=> PulsarClient
.Builder()
.Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .KeepAliveInterval(TimeSpan.FromSeconds(5))
Review Comment:
Why is this added?
##########
tests/DotPulsar.Tests/xunit.runner.json:
##########
@@ -1,4 +1,5 @@
{
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
- "diagnosticMessages": true
+ "diagnosticMessages": true,
+ "parallelizeTestCollections": false
Review Comment:
This can be removed again if you go ahead with just one cluster for all
integration tests.
##########
DotPulsar.sln:
##########
@@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution
Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing",
"samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer",
"tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj",
"{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}"
Review Comment:
Let's get the new tests into DotPulsar.Tests instead of creating new test
projects.
##########
tests/DotPulsar.Tests/IntegrationCollection.cs:
##########
@@ -18,3 +18,6 @@ namespace DotPulsar.Tests;
[CollectionDefinition("Integration")]
public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { }
+
+[CollectionDefinition("KeepAlive")]
+public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { }
Review Comment:
Is a new (standalone) cluster needed?
Seems we could solve this with just one cluster/integration fixture.
##########
src/DotPulsar/Abstractions/IPulsarClientBuilder.cs:
##########
@@ -53,6 +53,16 @@ public interface IPulsarClientBuilder
/// </summary>
IPulsarClientBuilder KeepAliveInterval(TimeSpan interval);
+ /// <summary>
+ /// The maximum amount of time to wait without receiving any message from
the server at
+ /// which point the connection is assumed to be dead or the server is not
responding.
+ /// As we are sending pings the server should respond to those at a
minimum within this specified timeout period.
+ /// Once this happens the connection will be torn down and all
consumers/producers will enter
+ /// the disconnected state and attempt to reconnect
+ /// The default is 60 seconds.
+ /// </summary>
+ IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval);
Review Comment:
Is this also a configurable setting for other clients? If not, we could just
hardcode it.
##########
src/DotPulsar/Internal/PingPongHandler.cs:
##########
@@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable
{
private readonly IConnection _connection;
private readonly TimeSpan _keepAliveInterval;
+ private readonly TimeSpan _serverResponseTimeout;
private readonly Timer _timer;
private readonly CommandPing _ping;
private readonly CommandPong _pong;
private long _lastCommand;
+ private readonly TaskCompletionSource<object> _serverNotRespondingTcs;
Review Comment:
The non-generic TaskCompletionSource is a better fit since the object is
never used.
##########
src/DotPulsar/Internal/PingPongHandler.cs:
##########
@@ -61,13 +69,25 @@ private void Watch(object? state)
var lastCommand = Interlocked.Read(ref _lastCommand);
var now = Stopwatch.GetTimestamp();
var elapsed = TimeSpan.FromSeconds((now - lastCommand) /
Stopwatch.Frequency);
- if (elapsed >= _keepAliveInterval)
+
+ if (elapsed > _serverResponseTimeout)
{
- Task.Factory.StartNew(() => SendPing());
- _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+ DotPulsarMeter.ServerTimedout();
+ _serverNotRespondingTcs.SetResult(new object());
Review Comment:
You might as well just return here instead of wrapping the following code in
an else.
##########
src/DotPulsar/Internal/Connection.cs:
##########
@@ -294,6 +298,11 @@ private async Task Send(BaseCommand command,
CancellationToken cancellationToken
}
public async Task ProcessIncommingFrames(CancellationToken
cancellationToken)
+ {
+ await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken),
_pingPongHandler.ServerNotResponding);
Review Comment:
This needs a ConfigureAwait(false)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]