This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f53312518 IGNITE-21413 .NET: Add tags to metrics (#3490)
6f53312518 is described below

commit 6f533125183f566db416b2d0221a93ee6c0ed371
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Mar 27 11:02:10 2024 +0200

    IGNITE-21413 .NET: Add tags to metrics (#3490)
    
    Add tags (map to attributes in OpenTelemetry, labels in Prometheus) to all 
`counter` type metrics in .NET client:
    * `client.id` identifies the client instance (random guid as string)
    * `node.addr` identifies the connection endpoint (one client can have 
multiple connections)
    
    This will allow the user to distinguish metrics between different client 
instances and different connections, and identify potential issues more 
precisely.
    
    **Also**
    * Expose tag names in public API via `MetricTags` static class (similar to 
existing `MetricNames`)
    * Change metrics that use `ObservableCounter` to `ObservableGauge`: those 
values go up and down, so counter is not the right choice
      * Gauges can only have a static predefined set of tags, so we don't add 
anything there
    
    **Design Considerations**
    * Large number of tag combinations can affect performance negatively, so we 
don't add things like `streamer.id`, which can grow indefinitely
    * We expect the client to be long-lived, so the number of 
`client.id`+`node.addr` should be limited
    * We cache `MetricsContext` with all tags per endpoint to avoid allocations
---
 .../Apache.Ignite.Tests/ClientSocketTests.cs       |   2 +-
 .../dotnet/Apache.Ignite.Tests/MetricsTests.cs     | 203 ++++++++++++++-------
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs |  34 +++-
 ...onnectionContext.cs => ClientSocket.Metrics.cs} |  34 ++--
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |  73 +++++---
 .../Apache.Ignite/Internal/ConnectionContext.cs    |   3 +-
 .../Apache.Ignite/Internal/IgniteClientInternal.cs |   6 +-
 .../dotnet/Apache.Ignite/Internal/Metrics.cs       |  10 +-
 .../{Network/ClusterNode.cs => MetricsContext.cs}  |  19 +-
 .../Apache.Ignite/Internal/Network/ClusterNode.cs  |  53 +++++-
 .../Apache.Ignite/Internal/SocketEndpoint.cs       |  24 ++-
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   |   7 +-
 .../Apache.Ignite/Internal/Table/RecordView.cs     |   9 +-
 .../Network/ClusterNode.cs => MetricTags.cs}       |  20 +-
 14 files changed, 343 insertions(+), 154 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 58ed2ec36d..e70800fc22 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -84,7 +84,7 @@ namespace Apache.Ignite.Tests
         }
 
         private static SocketEndpoint GetEndPoint(int? serverPort = null) =>
-            new(new(IPAddress.Loopback, serverPort ?? ServerPort), 
string.Empty);
+            new(new(IPAddress.Loopback, serverPort ?? ServerPort), 
string.Empty, string.Empty);
 
         private class NoOpListener : IClientSocketEventListener
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 9eb61a9499..68b9caef6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -27,6 +27,7 @@ using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 using Ignite.Table;
+using Internal;
 using NUnit.Framework;
 
 /// <summary>
@@ -42,8 +43,8 @@ public class MetricsTests
     [TearDown]
     public void TearDown()
     {
-        AssertMetric("requests-active", 0);
-        AssertMetric("connections-active", 0);
+        AssertMetric(MetricNames.RequestsActive, 0);
+        AssertMetric(MetricNames.ConnectionsActive, 0);
 
         _listener.Dispose();
 
@@ -58,20 +59,26 @@ public class MetricsTests
     {
         using var server = new FakeServer();
 
-        AssertMetric("connections-established", 0);
-        AssertMetric("connections-active", 0);
+        AssertMetric(MetricNames.ConnectionsEstablished, 0);
+        AssertMetric(MetricNames.ConnectionsActive, 0);
 
-        using (await server.ConnectClientAsync())
+        var client1 = await server.ConnectClientAsync();
+        using (client1)
         {
-            AssertMetric("connections-established", 1);
-            AssertMetric("connections-active", 1);
+            AssertMetric(MetricNames.ConnectionsEstablished, 1);
+            AssertMetric(MetricNames.ConnectionsActive, 1);
         }
 
-        AssertMetric("connections-active", 0);
+        AssertMetric(MetricNames.ConnectionsActive, 0);
 
-        (await server.ConnectClientAsync()).Dispose();
-        AssertMetric("connections-established", 2);
-        AssertMetric("connections-active", 0);
+        var client2 = await server.ConnectClientAsync();
+        client2.Dispose();
+
+        AssertMetric(MetricNames.ConnectionsEstablished, 2);
+        AssertMetric(MetricNames.ConnectionsActive, 0);
+
+        AssertTaggedMetric(MetricNames.ConnectionsEstablished, 1, server, 
client1);
+        AssertTaggedMetric(MetricNames.ConnectionsEstablished, 1, server, 
client2);
     }
 
     [Test]
@@ -79,18 +86,21 @@ public class MetricsTests
     {
         using var server = new FakeServer();
 
-        AssertMetric("bytes-sent", 0);
-        AssertMetric("bytes-received", 0);
+        AssertMetric(MetricNames.BytesSent, 0);
+        AssertMetric(MetricNames.BytesReceived, 0);
 
         using var client = await server.ConnectClientAsync();
 
-        AssertMetric("bytes-sent", 11);
-        AssertMetric("bytes-received", 63);
+        AssertMetric(MetricNames.BytesSent, 15);
+        AssertMetric(MetricNames.BytesReceived, 63);
 
         await client.Tables.GetTablesAsync();
 
-        AssertMetric("bytes-sent", 17);
-        AssertMetric("bytes-received", 72);
+        AssertMetric(MetricNames.BytesSent, 21);
+        AssertMetric(MetricNames.BytesReceived, 72);
+
+        AssertTaggedMetric(MetricNames.BytesSent, 21, server, client);
+        AssertTaggedMetric(MetricNames.BytesReceived, 72, server, client);
     }
 
     [Test]
@@ -100,13 +110,15 @@ public class MetricsTests
         using var server = new FakeServer();
         using var client = await server.ConnectClientAsync(GetConfig());
 
-        AssertMetric("connections-lost", 0);
-        AssertMetric("connections-lost-timeout", 0);
+        AssertMetric(MetricNames.ConnectionsLost, 0);
+        AssertMetric(MetricNames.ConnectionsLostTimeout, 0);
 
         server.Dispose();
 
-        AssertMetric("connections-lost", 1);
-        AssertMetric("connections-lost-timeout", 0);
+        AssertMetric(MetricNames.ConnectionsLost, 1);
+        AssertMetric(MetricNames.ConnectionsLostTimeout, 0);
+
+        AssertTaggedMetric(MetricNames.ConnectionsLost, 1, server, client);
     }
 
     [Test]
@@ -115,8 +127,10 @@ public class MetricsTests
         using var server = new FakeServer { HeartbeatDelay = 
TimeSpan.FromSeconds(3) };
         using var client = await 
server.ConnectClientAsync(GetConfigWithDelay());
 
-        AssertMetric("connections-lost-timeout", 0);
-        AssertMetric("connections-lost-timeout", 1, timeoutMs: 10_000);
+        AssertMetric(MetricNames.ConnectionsLostTimeout, 0);
+        AssertMetric(MetricNames.ConnectionsLostTimeout, 1, timeoutMs: 10_000);
+
+        AssertTaggedMetric(MetricNames.ConnectionsLostTimeout, 1, server, 
client);
     }
 
     [Test]
@@ -125,8 +139,11 @@ public class MetricsTests
         using var server = new FakeServer { SendInvalidMagic = true };
 
         Assert.ThrowsAsync<IgniteClientConnectionException>(async () => await 
server.ConnectClientAsync(GetConfig()));
-        AssertMetric("handshakes-failed", 1);
-        AssertMetric("handshakes-failed-timeout", 0);
+        AssertMetric(MetricNames.HandshakesFailed, 1);
+        AssertMetric(MetricNames.HandshakesFailedTimeout, 0);
+        AssertMetric(MetricNames.ConnectionsActive, 0);
+
+        AssertTaggedMetric(MetricNames.HandshakesFailed, 1, server, null);
     }
 
     [Test]
@@ -135,8 +152,10 @@ public class MetricsTests
         using var server = new FakeServer { HandshakeDelay = 
TimeSpan.FromSeconds(1) };
 
         Assert.ThrowsAsync<IgniteClientConnectionException>(async () => await 
server.ConnectClientAsync(GetConfigWithDelay()));
-        AssertMetric("handshakes-failed", 0);
-        AssertMetric("handshakes-failed-timeout", 1);
+        AssertMetric(MetricNames.HandshakesFailed, 0);
+        AssertMetric(MetricNames.HandshakesFailedTimeout, 1);
+
+        AssertTaggedMetric(MetricNames.HandshakesFailedTimeout, 1, server, 
null);
     }
 
     [Test]
@@ -145,21 +164,25 @@ public class MetricsTests
         using var server = new FakeServer();
         using var client = await server.ConnectClientAsync();
 
-        AssertMetric("requests-sent", 0);
-        AssertMetric("requests-failed", 0);
-        AssertMetric("requests-completed", 0);
+        AssertMetric(MetricNames.RequestsSent, 0);
+        AssertMetric(MetricNames.RequestsFailed, 0);
+        AssertMetric(MetricNames.RequestsCompleted, 0);
 
         await client.Tables.GetTablesAsync();
 
-        AssertMetric("requests-sent", 1);
-        AssertMetric("requests-failed", 0);
-        AssertMetric("requests-completed", 1);
+        AssertMetric(MetricNames.RequestsSent, 1);
+        AssertMetric(MetricNames.RequestsFailed, 0);
+        AssertMetric(MetricNames.RequestsCompleted, 1);
 
         Assert.ThrowsAsync<IgniteException>(async () => await 
client.Tables.GetTableAsync("bad-table"));
 
-        AssertMetric("requests-sent", 2);
-        AssertMetric("requests-failed", 1);
-        AssertMetric("requests-completed", 1);
+        AssertMetric(MetricNames.RequestsSent, 2);
+        AssertMetric(MetricNames.RequestsFailed, 1);
+        AssertMetric(MetricNames.RequestsCompleted, 1);
+
+        AssertTaggedMetric(MetricNames.RequestsSent, 2, server, client);
+        AssertTaggedMetric(MetricNames.RequestsFailed, 1, server, client);
+        AssertTaggedMetric(MetricNames.RequestsCompleted, 1, server, client);
     }
 
     [Test]
@@ -168,14 +191,16 @@ public class MetricsTests
         using var server = new FakeServer { OperationDelay = 
TimeSpan.FromSeconds(1) };
         using var client = await server.ConnectClientAsync();
 
-        AssertMetric("requests-active", 0);
+        AssertMetric(MetricNames.RequestsActive, 0);
 
         _ = client.Tables.GetTablesAsync();
 
-        AssertMetric("requests-active", 1);
-        AssertMetric("requests-sent", 1);
-        AssertMetric("requests-completed", 0);
-        AssertMetric("requests-failed", 0);
+        AssertMetric(MetricNames.RequestsActive, 1);
+        AssertMetric(MetricNames.RequestsSent, 1);
+        AssertMetric(MetricNames.RequestsCompleted, 0);
+        AssertMetric(MetricNames.RequestsFailed, 0);
+
+        AssertTaggedMetric(MetricNames.RequestsSent, 1, server, client);
     }
 
     [Test]
@@ -185,10 +210,12 @@ public class MetricsTests
         using var client = await server.ConnectClientAsync();
 
         await client.Tables.GetTablesAsync();
-        AssertMetric("requests-retried", 0);
+        AssertMetric(MetricNames.RequestsRetried, 0);
 
         await client.Tables.GetTablesAsync();
-        AssertMetric("requests-retried", 3);
+        AssertMetric(MetricNames.RequestsRetried, 3);
+
+        AssertTaggedMetric(MetricNames.RequestsRetried, 3, server, client);
     }
 
     [Test]
@@ -197,40 +224,43 @@ public class MetricsTests
         using var server = new FakeServer();
         using var client = await server.ConnectClientAsync();
 
-        AssertMetric("streamer-batches-sent", 0);
-        AssertMetric("streamer-items-sent", 0);
-        AssertMetric("streamer-batches-active", 0);
-        AssertMetric("streamer-items-queued", 0);
+        AssertMetric(MetricNames.StreamerBatchesSent, 0);
+        AssertMetric(MetricNames.StreamerItemsSent, 0);
+        AssertMetric(MetricNames.StreamerBatchesActive, 0);
+        AssertMetric(MetricNames.StreamerItemsQueued, 0);
 
         var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
         var view = table!.RecordBinaryView;
 
         await view.StreamDataAsync(GetTuples().ToAsyncEnumerable(), 
DataStreamerOptions.Default with { PageSize = 2 });
 
-        AssertMetric("streamer-batches-sent", 1);
-        AssertMetric("streamer-items-sent", 2);
-        AssertMetric("streamer-batches-active", 0);
-        AssertMetric("streamer-items-queued", 0);
+        AssertMetric(MetricNames.StreamerBatchesSent, 1);
+        AssertMetric(MetricNames.StreamerItemsSent, 2);
+        AssertMetric(MetricNames.StreamerBatchesActive, 0);
+        AssertMetric(MetricNames.StreamerItemsQueued, 0);
+
+        AssertTaggedMetric(MetricNames.StreamerBatchesSent, 1, server, client);
+        AssertTaggedMetric(MetricNames.StreamerItemsSent, 2, server, client);
 
         IEnumerable<IIgniteTuple> GetTuples()
         {
-            AssertMetric("streamer-batches-active", 0);
-            AssertMetric("streamer-items-queued", 0);
+            AssertMetric(MetricNames.StreamerBatchesActive, 0);
+            AssertMetric(MetricNames.StreamerItemsQueued, 0);
 
             yield return new IgniteTuple { ["ID"] = 1 };
 
-            AssertMetric("streamer-batches-active", 1);
-            AssertMetric("streamer-items-queued", 1);
+            AssertMetric(MetricNames.StreamerBatchesActive, 1);
+            AssertMetric(MetricNames.StreamerItemsQueued, 1);
 
             yield return new IgniteTuple { ["ID"] = 2 };
 
-            AssertMetric("streamer-batches-active", 2);
-            AssertMetric("streamer-items-queued", 2);
+            AssertMetric(MetricNames.StreamerBatchesActive, 2);
+            AssertMetric(MetricNames.StreamerItemsQueued, 2);
 
-            AssertMetric("streamer-batches-sent", 1);
-            AssertMetric("streamer-batches-active", 1);
-            AssertMetric("streamer-items-queued", 0);
-            AssertMetric("streamer-items-sent", 2);
+            AssertMetric(MetricNames.StreamerBatchesSent, 1);
+            AssertMetric(MetricNames.StreamerBatchesActive, 1);
+            AssertMetric(MetricNames.StreamerItemsQueued, 0);
+            AssertMetric(MetricNames.StreamerItemsSent, 2);
         }
     }
 
@@ -240,10 +270,10 @@ public class MetricsTests
         using var server = new FakeServer();
         using var client = await server.ConnectClientAsync();
 
-        AssertMetric("streamer-batches-sent", 0);
-        AssertMetric("streamer-items-sent", 0);
-        AssertMetric("streamer-batches-active", 0);
-        AssertMetric("streamer-items-queued", 0);
+        AssertMetric(MetricNames.StreamerBatchesSent, 0);
+        AssertMetric(MetricNames.StreamerItemsSent, 0);
+        AssertMetric(MetricNames.StreamerBatchesActive, 0);
+        AssertMetric(MetricNames.StreamerItemsQueued, 0);
 
         var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
         var view = table!.RecordBinaryView;
@@ -251,13 +281,13 @@ public class MetricsTests
 
         var task = view.StreamDataAsync(GetTuples(), 
DataStreamerOptions.Default with { PageSize = 10 }, cts.Token);
 
-        AssertMetricGreaterOrEqual("streamer-batches-sent", 1);
+        AssertMetricGreaterOrEqual(MetricNames.StreamerBatchesSent, 1);
         cts.Cancel();
         Assert.CatchAsync<OperationCanceledException>(async () => await task);
 
-        AssertMetricGreaterOrEqual("streamer-batches-sent", 1);
-        AssertMetric("streamer-batches-active", 0);
-        AssertMetric("streamer-items-queued", 0);
+        AssertMetricGreaterOrEqual(MetricNames.StreamerBatchesSent, 1);
+        AssertMetric(MetricNames.StreamerBatchesActive, 0);
+        AssertMetric(MetricNames.StreamerItemsQueued, 0);
 
         static async IAsyncEnumerable<IIgniteTuple> 
GetTuples([EnumeratorCancellation] CancellationToken ct = default)
         {
@@ -308,9 +338,17 @@ public class MetricsTests
             RetryPolicy = new RetryNonePolicy()
         };
 
+    private static Guid? GetClientId(IIgniteClient? client) => 
client?.GetFieldValue<ClientFailoverSocket>("_socket").ClientId;
+
     private void AssertMetric(string name, int value, int timeoutMs = 1000) =>
         _listener.AssertMetric(name, value, timeoutMs);
 
+    private void AssertTaggedMetric(string name, int value, FakeServer server, 
IIgniteClient? client) =>
+        AssertTaggedMetric(name, value, server.Node.Address.ToString(), 
GetClientId(client));
+
+    private void AssertTaggedMetric(string name, int value, string nodeAddr, 
Guid? clientId) =>
+        _listener.AssertTaggedMetric(name, value, nodeAddr, clientId);
+
     private void AssertMetricGreaterOrEqual(string name, int value, int 
timeoutMs = 1000) =>
         _listener.AssertMetricGreaterOrEqual(name, value, timeoutMs);
 
@@ -320,6 +358,8 @@ public class MetricsTests
 
         private readonly ConcurrentDictionary<string, long> _metrics = new();
 
+        private readonly ConcurrentDictionary<string, long> _metricsWithTags = 
new();
+
         public Listener()
         {
             _listener.InstrumentPublished = (instrument, listener) =>
@@ -344,11 +384,31 @@ public class MetricsTests
             return _metrics.TryGetValue(name, out var val) ? (int)val : 0;
         }
 
-        public void AssertMetric(string name, int value, int timeoutMs = 1000) 
=>
+        public void AssertMetric(string name, int value, int timeoutMs = 1000)
+        {
             TestUtils.WaitForCondition(
                 condition: () => GetMetric(name) == value,
                 timeoutMs: timeoutMs,
                 messageFactory: () => $"{name}: expected '{value}', but was 
'{GetMetric(name)}'");
+        }
+
+        public void AssertTaggedMetric(string name, int value, string 
nodeAddr, Guid? clientId)
+        {
+            if (clientId == null)
+            {
+                // Client id is not known, find by name and node address.
+                var val = _metricsWithTags.Single(x =>
+                    x.Key.StartsWith($"{name}_{MetricTags.ClientId}=", 
StringComparison.Ordinal) &&
+                    x.Key.EndsWith($",{MetricTags.NodeAddress}={nodeAddr}", 
StringComparison.Ordinal));
+
+                Assert.AreEqual(value, val.Value);
+            }
+            else
+            {
+                var taggedName = 
$"{name}_{MetricTags.ClientId}={clientId},{MetricTags.NodeAddress}={nodeAddr}";
+                Assert.AreEqual(value, _metricsWithTags[taggedName]);
+            }
+        }
 
         public void AssertMetricGreaterOrEqual(string name, int value, int 
timeoutMs = 1000) =>
             TestUtils.WaitForCondition(
@@ -372,6 +432,9 @@ public class MetricsTests
             else
             {
                 _metrics.AddOrUpdate(instrument.Name, newVal, (_, val) => val 
+ newVal);
+
+                var taggedName = $"{instrument.Name}_{string.Join(",", 
tags.ToArray().Select(x => $"{x.Key}={x.Value}"))}";
+                _metricsWithTags.AddOrUpdate(taggedName, newVal, (_, val) => 
val + newVal);
             }
         }
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 6314c260b4..39a7c23872 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -40,6 +40,8 @@ namespace Apache.Ignite.Internal
     /// </summary>
     internal sealed class ClientFailoverSocket : IDisposable, 
IClientSocketEventListener
     {
+        private const string ExceptionDataEndpoint = "Endpoint";
+
         /** Current global endpoint index for Round-robin. */
         private static long _globalEndPointIndex;
 
@@ -110,6 +112,11 @@ namespace Apache.Ignite.Internal
         /// </summary>
         public long ObservableTimestamp => Interlocked.Read(ref 
_observableTimestamp);
 
+        /// <summary>
+        /// Gets the client ID.
+        /// </summary>
+        public Guid ClientId { get; } = Guid.NewGuid();
+
         /// <summary>
         /// Connects the socket.
         /// </summary>
@@ -197,9 +204,11 @@ namespace Apache.Ignite.Internal
 
             while (true)
             {
+                ClientSocket? socket = null;
+
                 try
                 {
-                    var socket = await 
GetSocketAsync(preferredNode).ConfigureAwait(false);
+                    socket = await 
GetSocketAsync(preferredNode).ConfigureAwait(false);
 
                     var buffer = await socket.DoOutInOpAsync(clientOp, 
request, expectNotifications).ConfigureAwait(false);
 
@@ -210,7 +219,14 @@ namespace Apache.Ignite.Internal
                     // Preferred node connection may not be available, do not 
use it after first failure.
                     preferredNode = default;
 
-                    if (!HandleOpError(e, clientOp, ref attempt, ref errors, 
retryPolicyOverride ?? Configuration.RetryPolicy))
+                    MetricsContext? metricsContext =
+                        socket?.MetricsContext
+                        ?? (e.Data[ExceptionDataEndpoint] as 
SocketEndpoint)?.MetricsContext
+                        ?? (e.InnerException?.Data[ExceptionDataEndpoint] as 
SocketEndpoint)?.MetricsContext;
+
+                    IRetryPolicy retryPolicy = retryPolicyOverride ?? 
Configuration.RetryPolicy;
+
+                    if (!HandleOpError(e, clientOp, ref attempt, ref errors, 
retryPolicy, metricsContext))
                     {
                         throw;
                     }
@@ -438,6 +454,8 @@ namespace Apache.Ignite.Internal
                 {
                     errors ??= new List<Exception>();
 
+                    e.Data[ExceptionDataEndpoint] = endPoint;
+
                     errors.Add(e);
                 }
             }
@@ -518,6 +536,9 @@ namespace Apache.Ignite.Internal
         /// </summary>
         private IEnumerable<SocketEndpoint> 
GetIpEndPoints(IgniteClientConfiguration cfg)
         {
+            // Metric collection tools expect numbers and strings, don't pass 
Guid.
+            var clientId = ClientId.ToString();
+
             foreach (var e in Endpoint.GetEndpoints(cfg))
             {
                 var host = e.Host;
@@ -525,7 +546,7 @@ namespace Apache.Ignite.Internal
 
                 foreach (var ip in GetIps(host))
                 {
-                    yield return new SocketEndpoint(new IPEndPoint(ip, 
e.Port), host);
+                    yield return new SocketEndpoint(new IPEndPoint(ip, 
e.Port), host, clientId);
                 }
             }
         }
@@ -610,6 +631,7 @@ namespace Apache.Ignite.Internal
         /// <param name="attempt">Current attempt.</param>
         /// <param name="errors">Previous errors.</param>
         /// <param name="retryPolicy">Retry policy.</param>
+        /// <param name="metricsContext">Metrics context.</param>
         /// <returns>True if the error was handled, false otherwise.</returns>
         [SuppressMessage("Microsoft.Design", "CA1002:DoNotExposeGenericLists", 
Justification = "Private.")]
         private bool HandleOpError(
@@ -617,7 +639,8 @@ namespace Apache.Ignite.Internal
             ClientOp op,
             ref int attempt,
             ref List<Exception>? errors,
-            IRetryPolicy? retryPolicy)
+            IRetryPolicy? retryPolicy,
+            MetricsContext? metricsContext)
         {
             if (!ShouldRetry(exception, op, attempt, retryPolicy))
             {
@@ -645,7 +668,8 @@ namespace Apache.Ignite.Internal
                 _logger.LogRetryingOperationDebug("Retrying", (int)op, op, 
attempt, exception.Message);
             }
 
-            Metrics.RequestsRetried.Add(1);
+            Metrics.RequestsRetried.Add(1, metricsContext?.Tags ?? 
Array.Empty<KeyValuePair<string, object?>>());
+            Debug.Assert(metricsContext != null, "metricsContext != null");
 
             if (errors == null)
             {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.Metrics.cs
similarity index 54%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
copy to modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.Metrics.cs
index c785dacb77..e900c213c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.Metrics.cs
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal
-{
-    using System;
-    using Ignite.Network;
+namespace Apache.Ignite.Internal;
 
+/// <summary>
+/// Metrics-related functionality.
+/// </summary>
+internal sealed partial class ClientSocket
+{
     /// <summary>
-    /// Socket connection context.
+    /// Gets the metrics context.
     /// </summary>
-    /// <param name="Version">Protocol version.</param>
-    /// <param name="IdleTimeout">Server idle timeout.</param>
-    /// <param name="ClusterNode">Cluster node.</param>
-    /// <param name="ClusterId">Cluster id.</param>
-    /// <param name="SslInfo">SSL info.</param>
-    internal record ConnectionContext(
-        ClientProtocolVersion Version,
-        TimeSpan IdleTimeout,
-        IClusterNode ClusterNode,
-        Guid ClusterId,
-        ISslInfo? SslInfo);
+    public MetricsContext MetricsContext { get; }
+
+    private static void AddBytesReceived(int bytes, MetricsContext 
metricsContext) =>
+        Metrics.BytesReceived.Add(bytes, metricsContext.Tags);
+
+    private static void AddBytesSent(int bytes, MetricsContext metricsContext) 
=>
+        Metrics.BytesSent.Add(bytes, metricsContext.Tags);
+
+    private void AddBytesSent(int bytes) => AddBytesSent(bytes, 
MetricsContext);
+
+    private void AddFailedRequest() => Metrics.RequestsFailed.Add(1, 
MetricsContext.Tags);
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index bb8273d9a9..90d183f70e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -24,7 +24,6 @@ namespace Apache.Ignite.Internal
     using System.Diagnostics.CodeAnalysis;
     using System.IO;
     using System.Linq;
-    using System.Net;
     using System.Net.Security;
     using System.Net.Sockets;
     using System.Threading;
@@ -40,7 +39,7 @@ namespace Apache.Ignite.Internal
     /// Wrapper over framework socket for Ignite thin client operations.
     /// </summary>
     // ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more 
efficient read/write methods).
-    internal sealed class ClientSocket : IDisposable
+    internal sealed partial class ClientSocket : IDisposable
     {
         /** General-purpose client type code. */
         private const byte ClientType = 2;
@@ -132,6 +131,9 @@ namespace Apache.Ignite.Internal
             _socketTimeout = configuration.SocketTimeout;
             _operationTimeout = configuration.OperationTimeout;
 
+            MetricsContext = connectionContext.ClusterNode.MetricsContext ??
+                             throw new InvalidOperationException("Metrics 
context is missing.");
+
             _heartbeatInterval = 
GetHeartbeatInterval(configuration.HeartbeatInterval, 
connectionContext.IdleTimeout, _logger);
 
             // ReSharper disable once AsyncVoidLambda (timer callback)
@@ -194,11 +196,11 @@ namespace Apache.Ignite.Internal
                     .WaitAsync(configuration.SocketTimeout, cts.Token)
                     .ConfigureAwait(false);
 
-                connected = true;
                 logger.LogConnectionEstablishedDebug(socket.RemoteEndPoint);
 
-                Metrics.ConnectionsEstablished.Add(1);
+                Metrics.ConnectionsEstablished.Add(1, 
endPoint.MetricsContext.Tags);
                 Metrics.ConnectionsActiveIncrement();
+                connected = true;
 
                 stream = new NetworkStream(socket, ownsSocket: true);
 
@@ -211,7 +213,7 @@ namespace Apache.Ignite.Internal
                     
logger.LogSslConnectionEstablishedDebug(socket.RemoteEndPoint, 
sslStream.NegotiatedCipherSuite);
                 }
 
-                var context = await HandshakeAsync(stream, endPoint.EndPoint, 
configuration, cts.Token)
+                var context = await HandshakeAsync(stream, endPoint, 
configuration, cts.Token)
                     .WaitAsync(configuration.SocketTimeout, cts.Token)
                     .ConfigureAwait(false);
 
@@ -240,11 +242,11 @@ namespace Apache.Ignite.Internal
 
                 if (ex.GetBaseException() is TimeoutException)
                 {
-                    Metrics.HandshakesFailedTimeout.Add(1);
+                    Metrics.HandshakesFailedTimeout.Add(1, 
endPoint.MetricsContext.Tags);
                 }
                 else
                 {
-                    Metrics.HandshakesFailed.Add(1);
+                    Metrics.HandshakesFailed.Add(1, 
endPoint.MetricsContext.Tags);
                 }
 
                 if (connected)
@@ -288,28 +290,35 @@ namespace Apache.Ignite.Internal
         /// <param name="cancellationToken">Cancellation token.</param>
         private static async Task<ConnectionContext> HandshakeAsync(
             Stream stream,
-            IPEndPoint endPoint,
+            SocketEndpoint endPoint,
             IgniteClientConfiguration configuration,
             CancellationToken cancellationToken)
         {
             await stream.WriteAsync(ProtoCommon.MagicBytes, 
cancellationToken).ConfigureAwait(false);
-            await WriteHandshakeAsync(stream, CurrentProtocolVersion, 
configuration, cancellationToken).ConfigureAwait(false);
+            await WriteHandshakeAsync(stream, CurrentProtocolVersion, 
configuration, endPoint.MetricsContext, cancellationToken)
+                .ConfigureAwait(false);
 
             await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
 
-            await CheckMagicBytesAsync(stream, 
cancellationToken).ConfigureAwait(false);
+            await CheckMagicBytesAsync(stream, endPoint.MetricsContext, 
cancellationToken).ConfigureAwait(false);
+
+            using var response = await ReadResponseAsync(stream, new byte[4], 
endPoint.MetricsContext, CancellationToken.None)
+                .ConfigureAwait(false);
 
-            using var response = await ReadResponseAsync(stream, new byte[4], 
CancellationToken.None).ConfigureAwait(false);
             return ReadHandshakeResponse(response.GetReader(), endPoint, 
GetSslInfo(stream));
         }
 
-        private static async ValueTask CheckMagicBytesAsync(Stream stream, 
CancellationToken cancellationToken)
+        private static async ValueTask CheckMagicBytesAsync(
+            Stream stream,
+            MetricsContext metricsContext,
+            CancellationToken cancellationToken)
         {
             var responseMagic = 
ByteArrayPool.Rent(ProtoCommon.MagicBytes.Length);
 
             try
             {
-                await ReceiveBytesAsync(stream, responseMagic, 
ProtoCommon.MagicBytes.Length, cancellationToken).ConfigureAwait(false);
+                await ReceiveBytesAsync(stream, responseMagic, 
ProtoCommon.MagicBytes.Length, metricsContext, cancellationToken)
+                    .ConfigureAwait(false);
 
                 for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
                 {
@@ -327,7 +336,7 @@ namespace Apache.Ignite.Internal
             }
         }
 
-        private static ConnectionContext ReadHandshakeResponse(MsgPackReader 
reader, IPEndPoint endPoint, ISslInfo? sslInfo)
+        private static ConnectionContext ReadHandshakeResponse(MsgPackReader 
reader, SocketEndpoint endPoint, ISslInfo? sslInfo)
         {
             var serverVer = new ClientProtocolVersion(reader.ReadInt16(), 
reader.ReadInt16(), reader.ReadInt16());
 
@@ -352,7 +361,7 @@ namespace Apache.Ignite.Internal
             return new ConnectionContext(
                 serverVer,
                 TimeSpan.FromMilliseconds(idleTimeoutMs),
-                new ClusterNode(clusterNodeId, clusterNodeName, endPoint),
+                new ClusterNode(clusterNodeId, clusterNodeName, 
endPoint.EndPoint, endPoint.MetricsContext),
                 clusterId,
                 sslInfo);
         }
@@ -386,15 +395,16 @@ namespace Apache.Ignite.Internal
         private static async ValueTask<PooledBuffer> ReadResponseAsync(
             Stream stream,
             byte[] messageSizeBytes,
+            MetricsContext metricsContext,
             CancellationToken cancellationToken)
         {
-            var size = await ReadMessageSizeAsync(stream, messageSizeBytes, 
cancellationToken).ConfigureAwait(false);
+            var size = await ReadMessageSizeAsync(stream, messageSizeBytes, 
metricsContext, cancellationToken).ConfigureAwait(false);
 
             var bytes = ByteArrayPool.Rent(size);
 
             try
             {
-                await ReceiveBytesAsync(stream, bytes, size, 
cancellationToken).ConfigureAwait(false);
+                await ReceiveBytesAsync(stream, bytes, size, metricsContext, 
cancellationToken).ConfigureAwait(false);
 
                 return new PooledBuffer(bytes, 0, size);
             }
@@ -409,12 +419,13 @@ namespace Apache.Ignite.Internal
         private static async Task<int> ReadMessageSizeAsync(
             Stream stream,
             byte[] buffer,
+            MetricsContext metricsContext,
             CancellationToken cancellationToken)
         {
             const int messageSizeByteCount = 4;
             Debug.Assert(buffer.Length >= messageSizeByteCount, "buffer.Length 
>= messageSizeByteCount");
 
-            await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, 
cancellationToken).ConfigureAwait(false);
+            await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, 
metricsContext, cancellationToken).ConfigureAwait(false);
 
             return ReadMessageSize(buffer);
         }
@@ -423,6 +434,7 @@ namespace Apache.Ignite.Internal
             Stream stream,
             byte[] buffer,
             int size,
+            MetricsContext metricsContext,
             CancellationToken cancellationToken)
         {
             int received = 0;
@@ -442,7 +454,7 @@ namespace Apache.Ignite.Internal
 
                 received += res;
 
-                Metrics.BytesReceived.Add(res);
+                AddBytesReceived(res, metricsContext);
             }
         }
 
@@ -450,6 +462,7 @@ namespace Apache.Ignite.Internal
             Stream stream,
             ClientProtocolVersion version,
             IgniteClientConfiguration configuration,
+            MetricsContext metricsContext,
             CancellationToken token)
         {
             using var bufferWriter = new PooledArrayBuffer(prefixSize: 
ProtoCommon.MessagePrefixSize);
@@ -462,7 +475,8 @@ namespace Apache.Ignite.Internal
             WriteMessageSize(resBuf, size);
 
             await stream.WriteAsync(resBuf, token).ConfigureAwait(false);
-            Metrics.BytesSent.Add(resBuf.Length);
+
+            AddBytesSent(resBuf.Length + ProtoCommon.MagicBytes.Length, 
metricsContext);
         }
 
         private static void WriteHandshake(MsgPackWriter w, 
ClientProtocolVersion version, IgniteClientConfiguration configuration)
@@ -595,7 +609,7 @@ namespace Apache.Ignite.Internal
             {
                 if (_requests.TryRemove(requestId, out _))
                 {
-                    Metrics.RequestsFailed.Add(1);
+                    AddFailedRequest();
                     Metrics.RequestsActiveDecrement();
                 }
 
@@ -645,7 +659,7 @@ namespace Apache.Ignite.Internal
 
                     await _stream.WriteAsync(requestBufWithPrefix, 
_disposeTokenSource.Token).ConfigureAwait(false);
 
-                    Metrics.BytesSent.Add(requestBufWithPrefix.Length);
+                    AddBytesSent(requestBufWithPrefix.Length);
                 }
                 else
                 {
@@ -654,10 +668,10 @@ namespace Apache.Ignite.Internal
                     var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 
4)];
                     await _stream.WriteAsync(prefixBytes, 
_disposeTokenSource.Token).ConfigureAwait(false);
 
-                    Metrics.BytesSent.Add(prefixBytes.Length);
+                    AddBytesSent(prefixBytes.Length);
                 }
 
-                Metrics.RequestsSent.Add(1);
+                Metrics.RequestsSent.Add(1, MetricsContext.Tags);
             }
             catch (Exception e)
             {
@@ -688,7 +702,8 @@ namespace Apache.Ignite.Internal
             {
                 while (!cancellationToken.IsCancellationRequested)
                 {
-                    PooledBuffer response = await ReadResponseAsync(_stream, 
messageSizeBytes, cancellationToken).ConfigureAwait(false);
+                    PooledBuffer response = await ReadResponseAsync(
+                        _stream, messageSizeBytes, MetricsContext, 
cancellationToken).ConfigureAwait(false);
 
                     // Invoke response handler in another thread to continue 
the receive loop.
                     // Response buffer should be disposed by the task handler.
@@ -774,13 +789,13 @@ namespace Apache.Ignite.Internal
 
             if (exception != null)
             {
-                Metrics.RequestsFailed.Add(1);
+                AddFailedRequest();
 
                 taskCompletionSource.TrySetException(exception);
                 return false;
             }
 
-            Metrics.RequestsCompleted.Add(1);
+            Metrics.RequestsCompleted.Add(1, MetricsContext.Tags);
 
             return taskCompletionSource.TrySetResult(response);
         }
@@ -873,11 +888,11 @@ namespace Apache.Ignite.Internal
                 {
                     _logger.LogConnectionClosedWithErrorWarn(ex, 
ConnectionContext.ClusterNode.Address, ex.Message);
 
-                    Metrics.ConnectionsLost.Add(1);
+                    Metrics.ConnectionsLost.Add(1, MetricsContext.Tags);
 
                     if (ex.GetBaseException() is TimeoutException)
                     {
-                        Metrics.ConnectionsLostTimeout.Add(1);
+                        Metrics.ConnectionsLostTimeout.Add(1, 
MetricsContext.Tags);
                     }
                 }
                 else
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
index c785dacb77..eb88008172 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal
 {
     using System;
     using Ignite.Network;
+    using Network;
 
     /// <summary>
     /// Socket connection context.
@@ -31,7 +32,7 @@ namespace Apache.Ignite.Internal
     internal record ConnectionContext(
         ClientProtocolVersion Version,
         TimeSpan IdleTimeout,
-        IClusterNode ClusterNode,
+        ClusterNode ClusterNode,
         Guid ClusterId,
         ISslInfo? SslInfo);
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 94eec8fc3e..92dd3f89a7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -92,9 +92,9 @@ namespace Apache.Ignite.Internal
                     Debug.Assert(fieldCount == 4, "fieldCount == 4");
 
                     res.Add(new ClusterNode(
-                        Id: r.ReadString(),
-                        Name: r.ReadString(),
-                        Address: new 
IPEndPoint(IPAddress.Parse(r.ReadString()), r.ReadInt32())));
+                        id: r.ReadString(),
+                        name: r.ReadString(),
+                        endpoint: new 
IPEndPoint(IPAddress.Parse(r.ReadString()), r.ReadInt32())));
                 }
 
                 return res;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
index 3115082d39..b8d723363e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
@@ -43,7 +43,7 @@ internal static class Metrics
     /// <summary>
     /// Currently active connections.
     /// </summary>
-    public static readonly ObservableCounter<int> ConnectionsActive = 
Meter.CreateObservableCounter(
+    public static readonly ObservableGauge<int> ConnectionsActive = 
Meter.CreateObservableGauge(
         name: MetricNames.ConnectionsActive,
         observeValue: () => Interlocked.CompareExchange(ref 
_connectionsActive, 0, 0),
         unit: "connections",
@@ -92,7 +92,7 @@ internal static class Metrics
     /// <summary>
     /// Currently active requests (request sent, waiting for response).
     /// </summary>
-    public static readonly ObservableCounter<int> RequestsActive = 
Meter.CreateObservableCounter(
+    public static readonly ObservableGauge<int> RequestsActive = 
Meter.CreateObservableGauge(
         name: MetricNames.RequestsActive,
         observeValue: () => Interlocked.CompareExchange(ref _requestsActive, 
0, 0),
         unit: "requests",
@@ -165,7 +165,7 @@ internal static class Metrics
     /// <summary>
     /// Data streamer batches active.
     /// </summary>
-    public static readonly ObservableCounter<int> StreamerBatchesActive = 
Meter.CreateObservableCounter(
+    public static readonly ObservableGauge<int> StreamerBatchesActive = 
Meter.CreateObservableGauge(
         name: MetricNames.StreamerBatchesActive,
         observeValue: () => Interlocked.CompareExchange(ref 
_streamerBatchesActive, 0, 0),
         unit: "batches",
@@ -174,9 +174,9 @@ internal static class Metrics
     /// <summary>
     /// Data streamer items (rows) queued.
     /// </summary>
-    public static readonly ObservableCounter<int> StreamerItemsQueued = 
Meter.CreateObservableCounter(
+    public static readonly ObservableGauge<int> StreamerItemsQueued = 
Meter.CreateObservableGauge(
         name: MetricNames.StreamerItemsQueued,
-        observeValue: () => Interlocked.CompareExchange(ref 
_streamerItemsQueued, 0, 0),
+        observeValue: () => new 
Measurement<int>(Interlocked.CompareExchange(ref _streamerItemsQueued, 0, 0)),
         unit: "items",
         description: "Total number of queued data streamer items (rows).");
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/MetricsContext.cs
similarity index 67%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
copy to modules/platforms/dotnet/Apache.Ignite/Internal/MetricsContext.cs
index bddcebab01..5da678aefa 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/MetricsContext.cs
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Network
-{
-    using System.Net;
-    using Ignite.Network;
+namespace Apache.Ignite.Internal;
 
-    /// <summary>
-    /// Cluster node.
-    /// </summary>
-    internal sealed record ClusterNode(string Id, string Name, IPEndPoint 
Address) : IClusterNode;
-}
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+/// <summary>
+/// Metrics context.
+/// </summary>
+/// <param name="Tags">Metric tags.</param>
+[SuppressMessage("Performance", "CA1819:Properties should not return arrays", 
Justification = "Performance.")]
+internal sealed record MetricsContext(KeyValuePair<string, object?>[] Tags);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
index bddcebab01..8534350ae3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
@@ -17,11 +17,62 @@
 
 namespace Apache.Ignite.Internal.Network
 {
+    using System;
     using System.Net;
     using Ignite.Network;
 
     /// <summary>
     /// Cluster node.
     /// </summary>
-    internal sealed record ClusterNode(string Id, string Name, IPEndPoint 
Address) : IClusterNode;
+    internal sealed record ClusterNode : IClusterNode
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ClusterNode"/> class.
+        /// </summary>
+        /// <param name="id">Id.</param>
+        /// <param name="name">Name.</param>
+        /// <param name="endpoint">Endpoint.</param>
+        /// <param name="metricsContext">Metrics context.</param>
+        internal ClusterNode(string id, string name, IPEndPoint endpoint, 
MetricsContext? metricsContext = null)
+        {
+            Id = id;
+            Name = name;
+            Address = endpoint;
+            MetricsContext = metricsContext;
+        }
+
+        /// <inheritdoc/>
+        public string Id { get; }
+
+        /// <inheritdoc/>
+        public string Name { get; }
+
+        /// <inheritdoc/>
+        public IPEndPoint Address { get; }
+
+        /// <summary>
+        /// Gets the metric tags.
+        /// </summary>
+        /// <returns>Metric tags for this node.</returns>
+        internal MetricsContext? MetricsContext { get; }
+
+        /// <inheritdoc/>
+        public bool Equals(ClusterNode? other)
+        {
+            if (ReferenceEquals(null, other))
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, other))
+            {
+                return true;
+            }
+
+            return Id == other.Id && Name == other.Name && 
Address.Equals(other.Address);
+        }
+
+        /// <inheritdoc/>
+        public override int GetHashCode() => HashCode.Combine(Id, Name, 
Address);
+    }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/SocketEndpoint.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/SocketEndpoint.cs
index 762fa3c890..a476ea123a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/SocketEndpoint.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/SocketEndpoint.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Internal
 {
+    using System.Collections.Generic;
     using System.Diagnostics;
     using System.Net;
 
@@ -25,7 +26,6 @@ namespace Apache.Ignite.Internal
     /// </summary>
     internal sealed class SocketEndpoint
     {
-        /** */
         private volatile ClientSocket? _socket;
 
         /// <summary>
@@ -33,10 +33,20 @@ namespace Apache.Ignite.Internal
         /// </summary>
         /// <param name="endPoint">Endpoint.</param>
         /// <param name="host">Host name.</param>
-        public SocketEndpoint(IPEndPoint endPoint, string host)
+        /// <param name="clientId">Client id.</param>
+        public SocketEndpoint(IPEndPoint endPoint, string host, string 
clientId)
         {
             EndPoint = endPoint;
             Host = host;
+
+            // Cache endpoint string for metrics and logging.
+            EndPointString = endPoint.ToString();
+
+            MetricsContext = new MetricsContext(new[]
+            {
+                new KeyValuePair<string, object?>(MetricTags.ClientId, 
clientId),
+                new KeyValuePair<string, object?>(MetricTags.NodeAddress, 
EndPointString)
+            });
         }
 
         /// <summary>
@@ -66,5 +76,15 @@ namespace Apache.Ignite.Internal
         /// Gets the host.
         /// </summary>
         public string Host { get; }
+
+        /// <summary>
+        /// Gets the cached endpoint string.
+        /// </summary>
+        public string EndPointString { get; }
+
+        /// <summary>
+        /// Gets the metrics context.
+        /// </summary>
+        public MetricsContext MetricsContext { get; }
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 63268259fb..c56e223077 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -64,7 +64,7 @@ internal static class DataStreamer
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
     internal static async Task StreamDataAsync<T>(
         IAsyncEnumerable<T> data,
-        Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
+        Func<PooledArrayBuffer, int, string, IRetryPolicy, Task> sender,
         RecordSerializer<T> writer,
         Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because 
Tasks are cached.
         Func<ValueTask<string?[]?>> partitionAssignmentProvider,
@@ -308,10 +308,7 @@ internal static class DataStreamer
 
                         // Wait for the previous batch for this node to 
preserve item order.
                         await oldTask.ConfigureAwait(false);
-                        await sender(buf, partition, 
retryPolicy).ConfigureAwait(false);
-
-                        Metrics.StreamerBatchesSent.Add(1);
-                        Metrics.StreamerItemsSent.Add(count);
+                        await sender(buf, count, partition, 
retryPolicy).ConfigureAwait(false);
 
                         return;
                     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index b8f93f4bf9..9f60de14fc 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -299,15 +299,20 @@ namespace Apache.Ignite.Internal.Table
             CancellationToken cancellationToken = default) =>
             await DataStreamer.StreamDataAsync(
                 data,
-                sender: async (batch, preferredNode, retryPolicy) =>
+                sender: async (batch, count, preferredNode, retryPolicy) =>
                 {
-                    using var resBuf = await DoOutInOpAsync(
+                    var (resBuf, socket) = await 
_table.Socket.DoOutInOpAndGetSocketAsync(
                             ClientOp.TupleUpsertAll,
                             tx: null,
                             batch,
                             PreferredNode.FromName(preferredNode),
                             retryPolicy)
                         .ConfigureAwait(false);
+
+                    resBuf.Dispose();
+
+                    Metrics.StreamerBatchesSent.Add(1, 
socket.MetricsContext.Tags);
+                    Metrics.StreamerItemsSent.Add(count, 
socket.MetricsContext.Tags);
                 },
                 writer: _ser,
                 schemaProvider: _table.GetSchemaAsync,
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs 
b/modules/platforms/dotnet/Apache.Ignite/MetricTags.cs
similarity index 60%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
copy to modules/platforms/dotnet/Apache.Ignite/MetricTags.cs
index bddcebab01..d372535aea 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/MetricTags.cs
@@ -15,13 +15,23 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Network
+namespace Apache.Ignite;
+
+/// <summary>
+/// Metric tag names. See also <see cref="MetricNames"/>.
+/// </summary>
+public static class MetricTags
 {
-    using System.Net;
-    using Ignite.Network;
+    // Naming guidelines: 
https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/attribute-naming.md
+    // 
https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation#best-practices-4
+
+    /// <summary>
+    /// Client id.
+    /// </summary>
+    public const string ClientId = "client.id";
 
     /// <summary>
-    /// Cluster node.
+    /// Node address.
     /// </summary>
-    internal sealed record ClusterNode(string Id, string Name, IPEndPoint 
Address) : IClusterNode;
+    public const string NodeAddress = "node.addr";
 }

Reply via email to