This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 81fe252ee7 IGNITE-19610 .NET: Add data streamer metrics (#2262)
81fe252ee7 is described below

commit 81fe252ee7847da92ff770035af00cbb930764c3
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jun 27 17:28:21 2023 +0300

    IGNITE-19610 .NET: Add data streamer metrics (#2262)
    
    Add data streamer metrics to .NET client:
    * `streamer-batches-sent`
    * `streamer-items-sent`
    * `streamer-batches-active`
    * `streamer-items-queued`
---
 .../dotnet/Apache.Ignite.Tests/MetricsTests.cs     | 87 ++++++++++++++++++++++
 .../Apache.Ignite/Internal/Compute/Compute.cs      |  6 +-
 .../dotnet/Apache.Ignite/Internal/Metrics.cs       | 59 +++++++++++++++
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 24 +++++-
 4 files changed, 170 insertions(+), 6 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 2f71b2d421..b819e47fb2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -22,7 +22,11 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
 using System.Diagnostics.Metrics;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
+using Ignite.Table;
 using NUnit.Framework;
 
 /// <summary>
@@ -199,6 +203,89 @@ public class MetricsTests
         Assert.AreEqual(3, _listener.GetMetric("requests-retried"));
     }
 
+    [Test]
+    public async Task TestDataStreamerMetrics()
+    {
+        using var server = new FakeServer();
+        using var client = await server.ConnectClientAsync();
+
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-sent"), 
"streamer-batches-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-sent"), 
"streamer-items-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+        var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        var view = table!.RecordBinaryView;
+
+        await view.StreamDataAsync(GetTuples().ToAsyncEnumerable(), 
DataStreamerOptions.Default with { BatchSize = 2 });
+
+        Assert.AreEqual(1, _listener.GetMetric("streamer-batches-sent"), 
"streamer-batches-sent");
+        Assert.AreEqual(2, _listener.GetMetric("streamer-items-sent"), 
"streamer-items-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+        IEnumerable<IIgniteTuple> GetTuples()
+        {
+            Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+            Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+            yield return new IgniteTuple { ["ID"] = 1 };
+
+            Assert.AreEqual(1, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+            Assert.AreEqual(1, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+            yield return new IgniteTuple { ["ID"] = 2 };
+
+            Assert.AreEqual(2, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+            Assert.AreEqual(2, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+            TestUtils.WaitForCondition(() => 
_listener.GetMetric("streamer-batches-sent") == 1);
+
+            Assert.AreEqual(1, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+            Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+            Assert.AreEqual(2, _listener.GetMetric("streamer-items-sent"), 
"streamer-items-sent");
+        }
+    }
+
+    [Test]
+    public async Task TestDataStreamerMetricsWithCancellation()
+    {
+        using var server = new FakeServer();
+        using var client = await server.ConnectClientAsync();
+
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-sent"), 
"streamer-batches-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-sent"), 
"streamer-items-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+        var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        var view = table!.RecordBinaryView;
+        var cts = new CancellationTokenSource();
+
+        var task = view.StreamDataAsync(GetTuples(), 
DataStreamerOptions.Default with { BatchSize = 10 }, cts.Token);
+
+        TestUtils.WaitForCondition(() => 
_listener.GetMetric("streamer-batches-sent") > 0);
+        cts.Cancel();
+        Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
+
+        Assert.GreaterOrEqual(_listener.GetMetric("streamer-batches-sent"), 1, 
"streamer-batches-sent");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"), 
"streamer-batches-active");
+        Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"), 
"streamer-items-queued");
+
+        static async IAsyncEnumerable<IIgniteTuple> 
GetTuples([EnumeratorCancellation] CancellationToken ct = default)
+        {
+            for (int i = 0; i < 50; i++)
+            {
+                yield return new IgniteTuple { ["ID"] = i };
+
+                if (i == 40)
+                {
+                    await Task.Delay(1000, ct);
+                }
+            }
+        }
+    }
+
     private static IgniteClientConfiguration GetConfig() =>
         new()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index f974d186cb..0aec31777e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -151,9 +151,9 @@ namespace Apache.Ignite.Internal.Compute
         {
             var w = buf.MessageWriter;
 
-            if (units is ICollection<DeploymentUnit> unitsCol)
+            if (units.TryGetNonEnumeratedCount(out var count))
             {
-                w.WriteArrayHeader(unitsCol.Count);
+                w.WriteArrayHeader(count);
                 foreach (var unit in units)
                 {
                     if (string.IsNullOrEmpty(unit.Name))
@@ -174,7 +174,7 @@ namespace Apache.Ignite.Internal.Compute
             }
 
             // Enumerable without known count - enumerate first, write count 
later.
-            var count = 0;
+            count = 0;
             var countSpan = buf.GetSpan(5);
             buf.Advance(5);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
index 001033bc45..59921e0add 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
@@ -36,6 +36,10 @@ internal static class Metrics
 
     private static int _requestsActive;
 
+    private static int _streamerBatchesActive;
+
+    private static int _streamerItemsQueued;
+
     /// <summary>
     /// Currently active connections.
     /// </summary>
@@ -142,6 +146,40 @@ internal static class Metrics
         unit: "bytes",
         description: "Total number of bytes received");
 
+    /// <summary>
+    /// Data streamer batches sent.
+    /// </summary>
+    public static readonly Counter<long> StreamerBatchesSent = 
Meter.CreateCounter<long>(
+        name: "streamer-batches-sent",
+        unit: "batches",
+        description: "Total number of data streamer batches sent.");
+
+    /// <summary>
+    /// Data streamer items sent.
+    /// </summary>
+    public static readonly Counter<long> StreamerItemsSent = 
Meter.CreateCounter<long>(
+        name: "streamer-items-sent",
+        unit: "batches",
+        description: "Total number of data streamer items sent.");
+
+    /// <summary>
+    /// Data streamer batches active.
+    /// </summary>
+    public static readonly ObservableCounter<int> StreamerBatchesActive = 
Meter.CreateObservableCounter(
+        name: "streamer-batches-active",
+        observeValue: () => Interlocked.CompareExchange(ref 
_streamerBatchesActive, 0, 0),
+        unit: "batches",
+        description: "Total number of existing data streamer batches.");
+
+    /// <summary>
+    /// Data streamer items (rows) queued.
+    /// </summary>
+    public static readonly ObservableCounter<int> StreamerItemsQueued = 
Meter.CreateObservableCounter(
+        name: "streamer-items-queued",
+        observeValue: () => Interlocked.CompareExchange(ref 
_streamerItemsQueued, 0, 0),
+        unit: "items",
+        description: "Total number of queued data streamer items (rows).");
+
     /// <summary>
     /// Increments active connections.
     /// </summary>
@@ -161,4 +199,25 @@ internal static class Metrics
     /// Decrements active requests.
     /// </summary>
     public static void RequestsActiveDecrement() => Interlocked.Decrement(ref 
_requestsActive);
+
+    /// <summary>
+    /// Increments active streamer batches.
+    /// </summary>
+    public static void StreamerBatchesActiveIncrement() => 
Interlocked.Increment(ref _streamerBatchesActive);
+
+    /// <summary>
+    /// Decrements active streamer batches.
+    /// </summary>
+    public static void StreamerBatchesActiveDecrement() => 
Interlocked.Decrement(ref _streamerBatchesActive);
+
+    /// <summary>
+    /// Increments streamer items queued.
+    /// </summary>
+    public static void StreamerItemsQueuedIncrement() => 
Interlocked.Increment(ref _streamerItemsQueued);
+
+    /// <summary>
+    /// Decrements streamer items queued.
+    /// </summary>
+    /// <param name="count">The count.</param>
+    public static void StreamerItemsQueuedDecrement(int count) => 
Interlocked.Add(ref _streamerItemsQueued, -count);
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 9497885965..13b43ddb36 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -91,8 +91,11 @@ internal static class DataStreamer
 
             await foreach (var item in 
data.WithCancellation(cancellationToken))
             {
-                var (batch, partition) = Add(item);
+                // WithCancellation passes the token to the producer.
+                // However, not all producers support cancellation, so we need 
to check it here as well.
+                cancellationToken.ThrowIfCancellationRequested();
 
+                var (batch, partition) = Add(item);
                 if (batch.Count >= options.BatchSize)
                 {
                     await SendAsync(batch, partition).ConfigureAwait(false);
@@ -121,6 +124,9 @@ internal static class DataStreamer
             foreach (var batch in batches.Values)
             {
                 batch.Buffer.Dispose();
+
+                Metrics.StreamerItemsQueuedDecrement(batch.Count);
+                Metrics.StreamerBatchesActiveDecrement();
             }
         }
 
@@ -163,6 +169,8 @@ internal static class DataStreamer
                 batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
             }
 
+            Metrics.StreamerItemsQueuedIncrement();
+
             return (batch, partition);
         }
 
@@ -174,6 +182,8 @@ internal static class DataStreamer
             {
                 batchRef = new Batch();
                 InitBuffer(batchRef);
+
+                Metrics.StreamerBatchesActiveIncrement();
             }
 
             return batchRef;
@@ -200,26 +210,34 @@ internal static class DataStreamer
                 buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
                 buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
 
-                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task);
+                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task, batch.Count);
 
                 batch.Count = 0;
                 batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
                 InitBuffer(batch);
                 batch.LastFlush = Stopwatch.GetTimestamp();
+
+                Metrics.StreamerBatchesActiveIncrement();
             }
         }
 
-        async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string 
partition, Task oldTask)
+        async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string 
partition, Task oldTask, int count)
         {
             try
             {
                 // Wait for the previous batch for this node to preserve item 
order.
                 await oldTask.ConfigureAwait(false);
                 await sender(buf, partition, 
retryPolicy).ConfigureAwait(false);
+
+                Metrics.StreamerBatchesSent.Add(1);
+                Metrics.StreamerItemsSent.Add(count);
             }
             finally
             {
                 buf.Dispose();
+
+                Metrics.StreamerItemsQueuedDecrement(count);
+                Metrics.StreamerBatchesActiveDecrement();
             }
         }
 

Reply via email to