This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 99457caa18 IGNITE-22642 Add internal heartbeat API to Java and .NET 
clients (#4146)
99457caa18 is described below

commit 99457caa1896cf78f28c903880eb2988646b954c
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jul 29 13:06:21 2024 +0300

    IGNITE-22642 Add internal heartbeat API to Java and .NET clients (#4146)
    
    Add internal heartbeat API with optional payload for benchmarking to Java 
and .NET clients.
---
 .../ignite/internal/client/ClientChannel.java      | 12 ++++++
 .../ignite/internal/client/ReliableChannel.java    | 23 ++++++++++
 .../ignite/internal/client/TcpIgniteClient.java    |  9 ++++
 .../client/ObservableTimestampPropagationTest.java |  4 +-
 .../ignite/client/PartitionAwarenessTest.java      |  3 +-
 .../org/apache/ignite/client/RetryPolicyTest.java  |  6 +--
 .../Apache.Ignite.Tests/IgniteClientTests.cs       | 20 +++++++++
 .../dotnet/Apache.Ignite.Tests/MetricsTests.cs     |  2 +-
 .../dotnet/Apache.Ignite.Tests/TestUtils.cs        |  2 +-
 .../Transactions/TransactionsTests.cs              |  4 +-
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 19 ++++++++
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  | 50 ++++++++++++----------
 .../Apache.Ignite/Internal/IgniteClientInternal.cs | 23 +++++-----
 .../app/client/ItThinClientConnectionTest.java     | 20 +++++++++
 14 files changed, 151 insertions(+), 46 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 67350024b3..4e93d9c90a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientOp;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -71,4 +72,15 @@ public interface ClientChannel extends AutoCloseable {
      * @return Protocol context.
      */
     ProtocolContext protocolContext();
+
+    /**
+     * Send heartbeat request.
+     *
+     * @param payloadWriter Payload writer or {@code null} for no payload.
+     *     Heartbeat request payload is ignored by the server, but can be used 
for benchmarking.
+     * @return Future for the operation.
+     */
+    default CompletableFuture<Void> heartbeatAsync(@Nullable PayloadWriter 
payloadWriter) {
+        return serviceAsync(ClientOp.HEARTBEAT, payloadWriter, null, false);
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index e648d33889..906dd15405 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -200,6 +200,29 @@ public final class ReliableChannel implements 
AutoCloseable {
         return observableTimestamp.get();
     }
 
+    /**
+     * Gets active client channels.
+     *
+     * @return List of connected channels.
+     */
+    public List<ClientChannel> channels() {
+        List<ClientChannel> res = new ArrayList<>(channels.size());
+
+        for (var holder : nodeChannelsByName.values()) {
+            var chFut = holder.chFut;
+
+            if (chFut != null) {
+                ClientChannel ch = ClientFutureUtils.getNowSafe(chFut);
+
+                if (ch != null && !ch.closed()) {
+                    res.add(ch);
+                }
+            }
+        }
+
+        return res;
+    }
+
     /**
      * Sends request and handles response asynchronously.
      *
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 7a7f508820..22e4062e4c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -252,6 +252,15 @@ public class TcpIgniteClient implements IgniteClient {
         return metrics;
     }
 
+    /**
+     * Returns the underlying channel.
+     *
+     * @return Channel.
+     */
+    public ReliableChannel channel() {
+        return ch;
+    }
+
     /**
      * Sends ClientMessage request to server side asynchronously and returns 
result future.
      *
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index 5ac9bdd4c3..1f7637b4d6 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -29,10 +29,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.tx.TransactionOptions;
@@ -71,7 +71,7 @@ public class ObservableTimestampPropagationTest extends 
BaseIgniteAbstractTest {
     @Test
     @SuppressWarnings("resource")
     public void testClientPropagatesLatestKnownHybridTimestamp() {
-        ReliableChannel ch = IgniteTestUtils.getFieldValue(client, "ch");
+        ReliableChannel ch = ((TcpIgniteClient) client).channel();
         TransactionOptions roOpts = new TransactionOptions().readOnly(true);
 
         // +2 because logical time is incremented on every call to nowLong - 
for replica tracker and for handshake.
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 633a1eb717..5a1120fe32 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.streamer.SimplePublisher;
@@ -179,7 +180,7 @@ public class PartitionAwarenessTest extends 
AbstractClientTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testClientReceivesPartitionAssignmentUpdates(boolean 
useHeartbeat) throws InterruptedException {
-        ReliableChannel ch = IgniteTestUtils.getFieldValue(client2, "ch");
+        ReliableChannel ch = ((TcpIgniteClient) client2).channel();
 
         // Check default assignment.
         RecordView<Tuple> recordView = defaultTable().recordView();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 4203dcd60b..aaee044ba9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -33,10 +33,10 @@ import org.apache.ignite.client.fakes.FakeIgniteTables;
 import org.apache.ignite.internal.client.ClientUtils;
 import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
 import org.apache.ignite.internal.client.RetryPolicyContextImpl;
+import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.LoggerFactory;
 import org.apache.ignite.table.RecordView;
@@ -104,7 +104,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
 
         try (var client = getClient(plc)) {
             Transaction tx = client.transactions().begin();
-            ClientLazyTransaction.ensureStarted(tx, 
IgniteTestUtils.getFieldValue(client, "ch"), null).join();
+            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel(), null).join();
 
             assertThrows(IgniteClientConnectionException.class, tx::commit);
             assertEquals(0, plc.invocations.size());
@@ -167,7 +167,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
         try (var client = getClient(plc)) {
             RecordView<Tuple> recView = 
client.tables().table("t").recordView();
             Transaction tx = client.transactions().begin();
-            ClientLazyTransaction.ensureStarted(tx, 
IgniteTestUtils.getFieldValue(client, "ch"), null).join();
+            ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient) 
client).channel(), null).join();
 
             var ex = assertThrows(IgniteException.class, () -> recView.get(tx, 
Tuple.create().set("id", 1)));
             assertThat(ex.getMessage(), containsString("Transaction context 
has been lost due to connection errors."));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
index ef8780e4d4..e637370107 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
@@ -19,6 +19,8 @@ namespace Apache.Ignite.Tests
 {
     using System.Linq;
     using System.Threading.Tasks;
+    using Internal;
+    using Internal.Buffers;
     using NUnit.Framework;
 
     /// <summary>
@@ -62,5 +64,23 @@ namespace Apache.Ignite.Tests
 
             Assert.AreEqual(expected, client.ToString());
         }
+
+        [Test]
+        public async Task TestHeartbeat()
+        {
+            using var client = await IgniteClient.StartAsync(GetConfig());
+            IgniteClientInternal clientInternal = (IgniteClientInternal)client;
+
+            var sockets = clientInternal.Socket.GetSockets().ToList();
+
+            using var payload = new PooledArrayBuffer();
+            payload.MessageWriter.Write("foo bar baz");
+
+            foreach (var socket in sockets)
+            {
+                await socket.HeartbeatAsync();
+                await socket.HeartbeatAsync(payload);
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index c0cb4eb618..d006347aaa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -338,7 +338,7 @@ public class MetricsTests
             RetryPolicy = new RetryNonePolicy()
         };
 
-    private static Guid? GetClientId(IIgniteClient? client) => 
client?.GetFieldValue<ClientFailoverSocket>("_socket").ClientId;
+    private static Guid? GetClientId(IIgniteClient? client) => 
((IgniteClientInternal?)client)?.Socket.ClientId;
 
     private void AssertMetric(string name, int value, int timeoutMs = 1000) =>
         _listener.AssertMetric(name, value, timeoutMs);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index 8dbdedc814..15fc63f8bb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -106,7 +106,7 @@ namespace Apache.Ignite.Tests
         internal static async Task ForceLazyTxStart(ITransaction tx, IIgnite 
client, PreferredNode preferredNode = default) =>
             await LazyTransaction.EnsureStartedAsync(
                 tx,
-                client.GetFieldValue<ClientFailoverSocket>("_socket"),
+                ((IgniteClientInternal)client).Socket,
                 preferredNode);
 
         private static FieldInfo GetNonPublicField(object obj, string 
fieldName)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 30d7445154..4f8e4ac965 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -296,7 +296,7 @@ namespace Apache.Ignite.Tests.Transactions
         public async Task TestObservableTimestampIsInitializedFromHandshake()
         {
             using var client = await IgniteClient.StartAsync(new() { Endpoints 
= { "127.0.0.1:" + ServerPort } });
-            var observableTimestamp = 
client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp;
+            var observableTimestamp = 
((IgniteClientInternal)client).Socket.ObservableTimestamp;
 
             Assert.Greater(observableTimestamp, 0);
         }
@@ -313,7 +313,7 @@ namespace Apache.Ignite.Tests.Transactions
 
             Assert.AreEqual(
                 server.ObservableTimestamp,
-                
client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp,
+                ((IgniteClientInternal)client).Socket.ObservableTimestamp,
                 "Handshake should initialize observable timestamp");
 
             server.ObservableTimestamp = 123;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 39a7c23872..497e7d74c5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -315,6 +315,25 @@ namespace Apache.Ignite.Internal
             }
         }
 
+        /// <summary>
+        /// Gets active sockets.
+        /// </summary>
+        /// <returns>Active sockets.</returns>
+        internal IEnumerable<ClientSocket> GetSockets()
+        {
+            var res = new List<ClientSocket>(_endpoints.Count);
+
+            foreach (var endpoint in _endpoints)
+            {
+                if (endpoint.Socket is { IsDisposed: false })
+                {
+                    res.Add(endpoint.Socket);
+                }
+            }
+
+            return res;
+        }
+
         /// <summary>
         /// Gets a socket. Reconnects if necessary.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 471923d59b..dec8eb3b0c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -138,7 +138,7 @@ namespace Apache.Ignite.Internal
 
             // ReSharper disable once AsyncVoidLambda (timer callback)
             _heartbeatTimer = new Timer(
-                callback: async _ => await 
SendHeartbeatAsync().ConfigureAwait(false),
+                callback: async _ => await 
HeartbeatAsync().ConfigureAwait(false),
                 state: null,
                 dueTime: _heartbeatInterval,
                 period: TimeSpan.FromMilliseconds(-1));
@@ -281,6 +281,32 @@ namespace Apache.Ignite.Internal
             Dispose(null);
         }
 
+        /// <summary>
+        /// Sends heartbeat message.
+        /// </summary>
+        /// <param name="payload">Optional payload. Ignored by the server, can 
be used for benchmarking.</param>
+        /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+        [SuppressMessage(
+            "Microsoft.Design",
+            "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "Any heartbeat exception should cause this 
instance to be disposed with an error.")]
+        internal async Task HeartbeatAsync(PooledArrayBuffer? payload = null)
+        {
+            try
+            {
+                using var buf = await DoOutInOpAsync(ClientOp.Heartbeat, 
payload)
+                    .WaitAsync(_socketTimeout)
+                    .ConfigureAwait(false);
+            }
+            catch (Exception e)
+            {
+                var message = "Heartbeat failed: " + e.Message;
+                _logger.LogHeartbeatError(e, message);
+
+                Dispose(new 
IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
+            }
+        }
+
         /// <summary>
         /// Performs the handshake exchange.
         /// </summary>
@@ -866,28 +892,6 @@ namespace Apache.Ignite.Internal
             }
         }
 
-        /// <summary>
-        /// Sends heartbeat message.
-        /// </summary>
-        [SuppressMessage(
-            "Microsoft.Design",
-            "CA1031:DoNotCatchGeneralExceptionTypes",
-            Justification = "Any heartbeat exception should cause this 
instance to be disposed with an error.")]
-        private async Task SendHeartbeatAsync()
-        {
-            try
-            {
-                using var buf = await 
DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
-            }
-            catch (Exception e)
-            {
-                var message = "Heartbeat failed: " + e.Message;
-                _logger.LogHeartbeatError(e, message);
-
-                Dispose(new 
IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
-            }
-        }
-
         /// <summary>
         /// Disposes this socket and completes active requests with the 
specified exception.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index a0c77eb9fc..9fd3a8f6f7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -18,9 +18,7 @@
 namespace Apache.Ignite.Internal
 {
     using System.Collections.Generic;
-    using System.Diagnostics;
     using System.Linq;
-    using System.Net;
     using System.Threading.Tasks;
     using Common;
     using Ignite.Compute;
@@ -37,16 +35,13 @@ namespace Apache.Ignite.Internal
     /// </summary>
     internal sealed class IgniteClientInternal : IIgniteClient
     {
-        /** Underlying connection. */
-        private readonly ClientFailoverSocket _socket;
-
         /// <summary>
         /// Initializes a new instance of the <see 
cref="IgniteClientInternal"/> class.
         /// </summary>
         /// <param name="socket">Socket.</param>
         public IgniteClientInternal(ClientFailoverSocket socket)
         {
-            _socket = socket;
+            Socket = socket;
 
             var sql = new Sql.Sql(socket);
             var tables = new Tables(socket, sql);
@@ -59,7 +54,7 @@ namespace Apache.Ignite.Internal
 
         /// <inheritdoc/>
         public IgniteClientConfiguration Configuration =>
-            new(_socket.Configuration); // Defensive copy.
+            new(Socket.Configuration); // Defensive copy.
 
         /// <inheritdoc/>
         public ITables Tables { get; }
@@ -73,10 +68,15 @@ namespace Apache.Ignite.Internal
         /// <inheritdoc/>
         public ISql Sql { get; }
 
+        /// <summary>
+        /// Gets the underlying socket.
+        /// </summary>
+        internal ClientFailoverSocket Socket { get; }
+
         /// <inheritdoc/>
         public async Task<IList<IClusterNode>> GetClusterNodesAsync()
         {
-            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.ClusterGetNodes).ConfigureAwait(false);
+            using var resBuf = await 
Socket.DoOutInOpAsync(ClientOp.ClusterGetNodes).ConfigureAwait(false);
 
             return Read();
 
@@ -96,13 +96,10 @@ namespace Apache.Ignite.Internal
         }
 
         /// <inheritdoc/>
-        public IList<IConnectionInfo> GetConnections() => 
_socket.GetConnections();
+        public IList<IConnectionInfo> GetConnections() => 
Socket.GetConnections();
 
         /// <inheritdoc/>
-        public void Dispose()
-        {
-            _socket.Dispose();
-        }
+        public void Dispose() => Socket.Dispose();
 
         /// <inheritdoc/>
         public override string ToString() =>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
index e2d1e3ed56..0c51641d59 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
@@ -22,11 +22,13 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.lang.IgniteException;
@@ -41,6 +43,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Tests thin client connecting to a real server node.
  */
+@SuppressWarnings("resource")
 @ExtendWith(WorkDirectoryExtension.class)
 public class ItThinClientConnectionTest extends ItAbstractThinClientTest {
     /**
@@ -96,4 +99,21 @@ public class ItThinClientConnectionTest extends 
ItAbstractThinClientTest {
     void clusterName() {
         assertThat(((TcpIgniteClient) client()).clusterName(), is("cluster"));
     }
+
+    @Test
+    void testHeartbeat() {
+        var client = (TcpIgniteClient) client();
+
+        List<ClientChannel> channels = client.channel().channels();
+
+        assertEquals(2, channels.size());
+
+        for (var channel : channels) {
+            assertFalse(channel.closed());
+
+            channel.heartbeatAsync(null).join();
+            channel.heartbeatAsync(w -> w.out().packString("foo-bar")).join();
+            channel.heartbeatAsync(w -> w.out().writePayload(new byte[]{1, 2, 
3})).join();
+        }
+    }
 }

Reply via email to