This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 99457caa18 IGNITE-22642 Add internal heartbeat API to Java and .NET
clients (#4146)
99457caa18 is described below
commit 99457caa1896cf78f28c903880eb2988646b954c
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jul 29 13:06:21 2024 +0300
IGNITE-22642 Add internal heartbeat API to Java and .NET clients (#4146)
Add internal heartbeat API with optional payload for benchmarking to Java
and .NET clients.
---
.../ignite/internal/client/ClientChannel.java | 12 ++++++
.../ignite/internal/client/ReliableChannel.java | 23 ++++++++++
.../ignite/internal/client/TcpIgniteClient.java | 9 ++++
.../client/ObservableTimestampPropagationTest.java | 4 +-
.../ignite/client/PartitionAwarenessTest.java | 3 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 6 +--
.../Apache.Ignite.Tests/IgniteClientTests.cs | 20 +++++++++
.../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/TestUtils.cs | 2 +-
.../Transactions/TransactionsTests.cs | 4 +-
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 19 ++++++++
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 50 ++++++++++++----------
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 23 +++++-----
.../app/client/ItThinClientConnectionTest.java | 20 +++++++++
14 files changed, 151 insertions(+), 46 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 67350024b3..4e93d9c90a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.jetbrains.annotations.Nullable;
/**
@@ -71,4 +72,15 @@ public interface ClientChannel extends AutoCloseable {
* @return Protocol context.
*/
ProtocolContext protocolContext();
+
+ /**
+ * Send heartbeat request.
+ *
+ * @param payloadWriter Payload writer or {@code null} for no payload.
+ * Heartbeat request payload is ignored by the server, but can be used
for benchmarking.
+ * @return Future for the operation.
+ */
+ default CompletableFuture<Void> heartbeatAsync(@Nullable PayloadWriter
payloadWriter) {
+ return serviceAsync(ClientOp.HEARTBEAT, payloadWriter, null, false);
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index e648d33889..906dd15405 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -200,6 +200,29 @@ public final class ReliableChannel implements
AutoCloseable {
return observableTimestamp.get();
}
+ /**
+ * Gets active client channels.
+ *
+ * @return List of connected channels.
+ */
+ public List<ClientChannel> channels() {
+ List<ClientChannel> res = new ArrayList<>(channels.size());
+
+ for (var holder : nodeChannelsByName.values()) {
+ var chFut = holder.chFut;
+
+ if (chFut != null) {
+ ClientChannel ch = ClientFutureUtils.getNowSafe(chFut);
+
+ if (ch != null && !ch.closed()) {
+ res.add(ch);
+ }
+ }
+ }
+
+ return res;
+ }
+
/**
* Sends request and handles response asynchronously.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 7a7f508820..22e4062e4c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -252,6 +252,15 @@ public class TcpIgniteClient implements IgniteClient {
return metrics;
}
+ /**
+ * Returns the underlying channel.
+ *
+ * @return Channel.
+ */
+ public ReliableChannel channel() {
+ return ch;
+ }
+
/**
* Sends ClientMessage request to server side asynchronously and returns
result future.
*
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index 5ac9bdd4c3..1f7637b4d6 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -29,10 +29,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.tx.TransactionOptions;
@@ -71,7 +71,7 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
@Test
@SuppressWarnings("resource")
public void testClientPropagatesLatestKnownHybridTimestamp() {
- ReliableChannel ch = IgniteTestUtils.getFieldValue(client, "ch");
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
TransactionOptions roOpts = new TransactionOptions().readOnly(true);
// +2 because logical time is incremented on every call to nowLong -
for replica tracker and for handshake.
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
index 633a1eb717..5a1120fe32 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.streamer.SimplePublisher;
@@ -179,7 +180,7 @@ public class PartitionAwarenessTest extends
AbstractClientTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testClientReceivesPartitionAssignmentUpdates(boolean
useHeartbeat) throws InterruptedException {
- ReliableChannel ch = IgniteTestUtils.getFieldValue(client2, "ch");
+ ReliableChannel ch = ((TcpIgniteClient) client2).channel();
// Check default assignment.
RecordView<Tuple> recordView = defaultTable().recordView();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 4203dcd60b..aaee044ba9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -33,10 +33,10 @@ import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
import org.apache.ignite.internal.client.RetryPolicyContextImpl;
+import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.LoggerFactory;
import org.apache.ignite.table.RecordView;
@@ -104,7 +104,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
try (var client = getClient(plc)) {
Transaction tx = client.transactions().begin();
- ClientLazyTransaction.ensureStarted(tx,
IgniteTestUtils.getFieldValue(client, "ch"), null).join();
+ ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel(), null).join();
assertThrows(IgniteClientConnectionException.class, tx::commit);
assertEquals(0, plc.invocations.size());
@@ -167,7 +167,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
try (var client = getClient(plc)) {
RecordView<Tuple> recView =
client.tables().table("t").recordView();
Transaction tx = client.transactions().begin();
- ClientLazyTransaction.ensureStarted(tx,
IgniteTestUtils.getFieldValue(client, "ch"), null).join();
+ ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel(), null).join();
var ex = assertThrows(IgniteException.class, () -> recView.get(tx,
Tuple.create().set("id", 1)));
assertThat(ex.getMessage(), containsString("Transaction context
has been lost due to connection errors."));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
index ef8780e4d4..e637370107 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientTests.cs
@@ -19,6 +19,8 @@ namespace Apache.Ignite.Tests
{
using System.Linq;
using System.Threading.Tasks;
+ using Internal;
+ using Internal.Buffers;
using NUnit.Framework;
/// <summary>
@@ -62,5 +64,23 @@ namespace Apache.Ignite.Tests
Assert.AreEqual(expected, client.ToString());
}
+
+ [Test]
+ public async Task TestHeartbeat()
+ {
+ using var client = await IgniteClient.StartAsync(GetConfig());
+ IgniteClientInternal clientInternal = (IgniteClientInternal)client;
+
+ var sockets = clientInternal.Socket.GetSockets().ToList();
+
+ using var payload = new PooledArrayBuffer();
+ payload.MessageWriter.Write("foo bar baz");
+
+ foreach (var socket in sockets)
+ {
+ await socket.HeartbeatAsync();
+ await socket.HeartbeatAsync(payload);
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index c0cb4eb618..d006347aaa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -338,7 +338,7 @@ public class MetricsTests
RetryPolicy = new RetryNonePolicy()
};
- private static Guid? GetClientId(IIgniteClient? client) =>
client?.GetFieldValue<ClientFailoverSocket>("_socket").ClientId;
+ private static Guid? GetClientId(IIgniteClient? client) =>
((IgniteClientInternal?)client)?.Socket.ClientId;
private void AssertMetric(string name, int value, int timeoutMs = 1000) =>
_listener.AssertMetric(name, value, timeoutMs);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index 8dbdedc814..15fc63f8bb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -106,7 +106,7 @@ namespace Apache.Ignite.Tests
internal static async Task ForceLazyTxStart(ITransaction tx, IIgnite
client, PreferredNode preferredNode = default) =>
await LazyTransaction.EnsureStartedAsync(
tx,
- client.GetFieldValue<ClientFailoverSocket>("_socket"),
+ ((IgniteClientInternal)client).Socket,
preferredNode);
private static FieldInfo GetNonPublicField(object obj, string
fieldName)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 30d7445154..4f8e4ac965 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -296,7 +296,7 @@ namespace Apache.Ignite.Tests.Transactions
public async Task TestObservableTimestampIsInitializedFromHandshake()
{
using var client = await IgniteClient.StartAsync(new() { Endpoints
= { "127.0.0.1:" + ServerPort } });
- var observableTimestamp =
client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp;
+ var observableTimestamp =
((IgniteClientInternal)client).Socket.ObservableTimestamp;
Assert.Greater(observableTimestamp, 0);
}
@@ -313,7 +313,7 @@ namespace Apache.Ignite.Tests.Transactions
Assert.AreEqual(
server.ObservableTimestamp,
-
client.GetFieldValue<ClientFailoverSocket>("_socket").ObservableTimestamp,
+ ((IgniteClientInternal)client).Socket.ObservableTimestamp,
"Handshake should initialize observable timestamp");
server.ObservableTimestamp = 123;
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 39a7c23872..497e7d74c5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -315,6 +315,25 @@ namespace Apache.Ignite.Internal
}
}
+ /// <summary>
+ /// Gets active sockets.
+ /// </summary>
+ /// <returns>Active sockets.</returns>
+ internal IEnumerable<ClientSocket> GetSockets()
+ {
+ var res = new List<ClientSocket>(_endpoints.Count);
+
+ foreach (var endpoint in _endpoints)
+ {
+ if (endpoint.Socket is { IsDisposed: false })
+ {
+ res.Add(endpoint.Socket);
+ }
+ }
+
+ return res;
+ }
+
/// <summary>
/// Gets a socket. Reconnects if necessary.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 471923d59b..dec8eb3b0c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -138,7 +138,7 @@ namespace Apache.Ignite.Internal
// ReSharper disable once AsyncVoidLambda (timer callback)
_heartbeatTimer = new Timer(
- callback: async _ => await
SendHeartbeatAsync().ConfigureAwait(false),
+ callback: async _ => await
HeartbeatAsync().ConfigureAwait(false),
state: null,
dueTime: _heartbeatInterval,
period: TimeSpan.FromMilliseconds(-1));
@@ -281,6 +281,32 @@ namespace Apache.Ignite.Internal
Dispose(null);
}
+ /// <summary>
+ /// Sends heartbeat message.
+ /// </summary>
+ /// <param name="payload">Optional payload. Ignored by the server, can
be used for benchmarking.</param>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ [SuppressMessage(
+ "Microsoft.Design",
+ "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "Any heartbeat exception should cause this
instance to be disposed with an error.")]
+ internal async Task HeartbeatAsync(PooledArrayBuffer? payload = null)
+ {
+ try
+ {
+ using var buf = await DoOutInOpAsync(ClientOp.Heartbeat,
payload)
+ .WaitAsync(_socketTimeout)
+ .ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ var message = "Heartbeat failed: " + e.Message;
+ _logger.LogHeartbeatError(e, message);
+
+ Dispose(new
IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
+ }
+ }
+
/// <summary>
/// Performs the handshake exchange.
/// </summary>
@@ -866,28 +892,6 @@ namespace Apache.Ignite.Internal
}
}
- /// <summary>
- /// Sends heartbeat message.
- /// </summary>
- [SuppressMessage(
- "Microsoft.Design",
- "CA1031:DoNotCatchGeneralExceptionTypes",
- Justification = "Any heartbeat exception should cause this
instance to be disposed with an error.")]
- private async Task SendHeartbeatAsync()
- {
- try
- {
- using var buf = await
DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
- }
- catch (Exception e)
- {
- var message = "Heartbeat failed: " + e.Message;
- _logger.LogHeartbeatError(e, message);
-
- Dispose(new
IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
- }
- }
-
/// <summary>
/// Disposes this socket and completes active requests with the
specified exception.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index a0c77eb9fc..9fd3a8f6f7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -18,9 +18,7 @@
namespace Apache.Ignite.Internal
{
using System.Collections.Generic;
- using System.Diagnostics;
using System.Linq;
- using System.Net;
using System.Threading.Tasks;
using Common;
using Ignite.Compute;
@@ -37,16 +35,13 @@ namespace Apache.Ignite.Internal
/// </summary>
internal sealed class IgniteClientInternal : IIgniteClient
{
- /** Underlying connection. */
- private readonly ClientFailoverSocket _socket;
-
/// <summary>
/// Initializes a new instance of the <see
cref="IgniteClientInternal"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
public IgniteClientInternal(ClientFailoverSocket socket)
{
- _socket = socket;
+ Socket = socket;
var sql = new Sql.Sql(socket);
var tables = new Tables(socket, sql);
@@ -59,7 +54,7 @@ namespace Apache.Ignite.Internal
/// <inheritdoc/>
public IgniteClientConfiguration Configuration =>
- new(_socket.Configuration); // Defensive copy.
+ new(Socket.Configuration); // Defensive copy.
/// <inheritdoc/>
public ITables Tables { get; }
@@ -73,10 +68,15 @@ namespace Apache.Ignite.Internal
/// <inheritdoc/>
public ISql Sql { get; }
+ /// <summary>
+ /// Gets the underlying socket.
+ /// </summary>
+ internal ClientFailoverSocket Socket { get; }
+
/// <inheritdoc/>
public async Task<IList<IClusterNode>> GetClusterNodesAsync()
{
- using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.ClusterGetNodes).ConfigureAwait(false);
+ using var resBuf = await
Socket.DoOutInOpAsync(ClientOp.ClusterGetNodes).ConfigureAwait(false);
return Read();
@@ -96,13 +96,10 @@ namespace Apache.Ignite.Internal
}
/// <inheritdoc/>
- public IList<IConnectionInfo> GetConnections() =>
_socket.GetConnections();
+ public IList<IConnectionInfo> GetConnections() =>
Socket.GetConnections();
/// <inheritdoc/>
- public void Dispose()
- {
- _socket.Dispose();
- }
+ public void Dispose() => Socket.Dispose();
/// <inheritdoc/>
public override string ToString() =>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
index e2d1e3ed56..0c51641d59 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java
@@ -22,11 +22,13 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteException;
@@ -41,6 +43,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests thin client connecting to a real server node.
*/
+@SuppressWarnings("resource")
@ExtendWith(WorkDirectoryExtension.class)
public class ItThinClientConnectionTest extends ItAbstractThinClientTest {
/**
@@ -96,4 +99,21 @@ public class ItThinClientConnectionTest extends
ItAbstractThinClientTest {
void clusterName() {
assertThat(((TcpIgniteClient) client()).clusterName(), is("cluster"));
}
+
+ @Test
+ void testHeartbeat() {
+ var client = (TcpIgniteClient) client();
+
+ List<ClientChannel> channels = client.channel().channels();
+
+ assertEquals(2, channels.size());
+
+ for (var channel : channels) {
+ assertFalse(channel.closed());
+
+ channel.heartbeatAsync(null).join();
+ channel.heartbeatAsync(w -> w.out().packString("foo-bar")).join();
+ channel.heartbeatAsync(w -> w.out().writePayload(new byte[]{1, 2,
3})).join();
+ }
+ }
}