blankensteiner commented on code in PR #167:
URL: https://github.com/apache/pulsar-dotpulsar/pull/167#discussion_r1293305237
##########
src/DotPulsar/Internal/Abstractions/IConnection.cs:
##########
@@ -42,4 +42,6 @@ public interface IConnection : IAsyncDisposable
Task Send(SendPackage command, TaskCompletionSource<BaseCommand>
responseTcs, CancellationToken cancellationToken);
Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken
cancellationToken);
Task<BaseCommand> Send(CommandPartitionedTopicMetadata command,
CancellationToken cancellationToken);
+
+ void MarkInactive();
Review Comment:
Currently, IConnection has a "HasChannels" method, we should just rename
this to "IsActive". Then it is still the ConnectionPool's
"CloseInactiveConnections" method that does the checking and disposing.
##########
src/DotPulsar/Internal/Connection.cs:
##########
@@ -336,6 +338,14 @@ public async ValueTask DisposeAsync()
await _stream.DisposeAsync().ConfigureAwait(false);
}
+ public async void MarkInactive()
Review Comment:
Instead of a callback, the connection already has a reference to the
PingPongHandler and should use that. The "IsActive" method should just check if
the PingPongHandler thinks the connection is still good (and if the connection
has channels). The PingPongHandler could have a "HasTimedOut" boolean property
or something like that.
##########
tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+[Trait("Category", "Unit")]
+public class PingPongHandlerTest
+{
+ [Fact]
+ public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+ {
+ var countDown = new CountdownEvent(1);
+ var connection = new MockConnection();
+ var keepAliveInterval = TimeSpan.FromSeconds(1);
+ connection.PingCallback = () => countDown.Signal();
+ var pingPongHandler = new PingPongHandler(connection,
keepAliveInterval);
+
+ // Wait for the first Ping
+ // The ping arrive time should be at (keepAliveInterval,
2*keepAliveInterval)
+ countDown.Wait(2 * keepAliveInterval);
+ pingPongHandler.Incoming(BaseCommand.Type.Pong);
+ Assert.False(connection.IsDispose);
+
+ // Wait for the second Ping, but we don't reply with Pong.
+ // The connection disposed time should be at (2*keepAliveInterval,
3*keepAliveInterval)
+ await Task.Delay(3 * keepAliveInterval);
+ Assert.True(connection.IsDispose);
Review Comment:
Should use FluentAssertions
##########
tests/DotPulsar.Tests/Internal/PingPongHandlerTest.cs:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Internal;
+using DotPulsar.Internal.Abstractions;
+using DotPulsar.Internal.PulsarApi;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+[Trait("Category", "Unit")]
+public class PingPongHandlerTest
+{
+ [Fact]
+ public async Task Watch_GivenConnectionNotAlive_ShouldDisposeConnection()
+ {
+ var countDown = new CountdownEvent(1);
+ var connection = new MockConnection();
+ var keepAliveInterval = TimeSpan.FromSeconds(1);
+ connection.PingCallback = () => countDown.Signal();
+ var pingPongHandler = new PingPongHandler(connection,
keepAliveInterval);
+
+ // Wait for the first Ping
+ // The ping arrive time should be at (keepAliveInterval,
2*keepAliveInterval)
+ countDown.Wait(2 * keepAliveInterval);
+ pingPongHandler.Incoming(BaseCommand.Type.Pong);
+ Assert.False(connection.IsDispose);
+
+ // Wait for the second Ping, but we don't reply with Pong.
+ // The connection disposed time should be at (2*keepAliveInterval,
3*keepAliveInterval)
+ await Task.Delay(3 * keepAliveInterval);
+ Assert.True(connection.IsDispose);
+ }
+
+ private class MockConnection : IConnection
Review Comment:
Can't NSubstitute mock this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]