This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 81fe252ee7 IGNITE-19610 .NET: Add data streamer metrics (#2262)
81fe252ee7 is described below
commit 81fe252ee7847da92ff770035af00cbb930764c3
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jun 27 17:28:21 2023 +0300
IGNITE-19610 .NET: Add data streamer metrics (#2262)
Add data streamer metrics to .NET client:
* `streamer-batches-sent`
* `streamer-items-sent`
* `streamer-batches-active`
* `streamer-items-queued`
---
.../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 87 ++++++++++++++++++++++
.../Apache.Ignite/Internal/Compute/Compute.cs | 6 +-
.../dotnet/Apache.Ignite/Internal/Metrics.cs | 59 +++++++++++++++
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 24 +++++-
4 files changed, 170 insertions(+), 6 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 2f71b2d421..b819e47fb2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -22,7 +22,11 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
using System.Threading.Tasks;
+using Ignite.Table;
using NUnit.Framework;
/// <summary>
@@ -199,6 +203,89 @@ public class MetricsTests
Assert.AreEqual(3, _listener.GetMetric("requests-retried"));
}
+ [Test]
+ public async Task TestDataStreamerMetrics()
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-sent"),
"streamer-batches-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-sent"),
"streamer-items-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ var view = table!.RecordBinaryView;
+
+ await view.StreamDataAsync(GetTuples().ToAsyncEnumerable(),
DataStreamerOptions.Default with { BatchSize = 2 });
+
+ Assert.AreEqual(1, _listener.GetMetric("streamer-batches-sent"),
"streamer-batches-sent");
+ Assert.AreEqual(2, _listener.GetMetric("streamer-items-sent"),
"streamer-items-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ IEnumerable<IIgniteTuple> GetTuples()
+ {
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ yield return new IgniteTuple { ["ID"] = 1 };
+
+ Assert.AreEqual(1, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(1, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ yield return new IgniteTuple { ["ID"] = 2 };
+
+ Assert.AreEqual(2, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(2, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ TestUtils.WaitForCondition(() =>
_listener.GetMetric("streamer-batches-sent") == 1);
+
+ Assert.AreEqual(1, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+ Assert.AreEqual(2, _listener.GetMetric("streamer-items-sent"),
"streamer-items-sent");
+ }
+ }
+
+ [Test]
+ public async Task TestDataStreamerMetricsWithCancellation()
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-sent"),
"streamer-batches-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-sent"),
"streamer-items-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ var view = table!.RecordBinaryView;
+ var cts = new CancellationTokenSource();
+
+ var task = view.StreamDataAsync(GetTuples(),
DataStreamerOptions.Default with { BatchSize = 10 }, cts.Token);
+
+ TestUtils.WaitForCondition(() =>
_listener.GetMetric("streamer-batches-sent") > 0);
+ cts.Cancel();
+ Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
+
+ Assert.GreaterOrEqual(_listener.GetMetric("streamer-batches-sent"), 1,
"streamer-batches-sent");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-batches-active"),
"streamer-batches-active");
+ Assert.AreEqual(0, _listener.GetMetric("streamer-items-queued"),
"streamer-items-queued");
+
+ static async IAsyncEnumerable<IIgniteTuple>
GetTuples([EnumeratorCancellation] CancellationToken ct = default)
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ yield return new IgniteTuple { ["ID"] = i };
+
+ if (i == 40)
+ {
+ await Task.Delay(1000, ct);
+ }
+ }
+ }
+ }
+
private static IgniteClientConfiguration GetConfig() =>
new()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index f974d186cb..0aec31777e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -151,9 +151,9 @@ namespace Apache.Ignite.Internal.Compute
{
var w = buf.MessageWriter;
- if (units is ICollection<DeploymentUnit> unitsCol)
+ if (units.TryGetNonEnumeratedCount(out var count))
{
- w.WriteArrayHeader(unitsCol.Count);
+ w.WriteArrayHeader(count);
foreach (var unit in units)
{
if (string.IsNullOrEmpty(unit.Name))
@@ -174,7 +174,7 @@ namespace Apache.Ignite.Internal.Compute
}
// Enumerable without known count - enumerate first, write count
later.
- var count = 0;
+ count = 0;
var countSpan = buf.GetSpan(5);
buf.Advance(5);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
index 001033bc45..59921e0add 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Metrics.cs
@@ -36,6 +36,10 @@ internal static class Metrics
private static int _requestsActive;
+ private static int _streamerBatchesActive;
+
+ private static int _streamerItemsQueued;
+
/// <summary>
/// Currently active connections.
/// </summary>
@@ -142,6 +146,40 @@ internal static class Metrics
unit: "bytes",
description: "Total number of bytes received");
+ /// <summary>
+ /// Data streamer batches sent.
+ /// </summary>
+ public static readonly Counter<long> StreamerBatchesSent =
Meter.CreateCounter<long>(
+ name: "streamer-batches-sent",
+ unit: "batches",
+ description: "Total number of data streamer batches sent.");
+
+ /// <summary>
+ /// Data streamer items sent.
+ /// </summary>
+ public static readonly Counter<long> StreamerItemsSent =
Meter.CreateCounter<long>(
+ name: "streamer-items-sent",
+ unit: "batches",
+ description: "Total number of data streamer items sent.");
+
+ /// <summary>
+ /// Data streamer batches active.
+ /// </summary>
+ public static readonly ObservableCounter<int> StreamerBatchesActive =
Meter.CreateObservableCounter(
+ name: "streamer-batches-active",
+ observeValue: () => Interlocked.CompareExchange(ref
_streamerBatchesActive, 0, 0),
+ unit: "batches",
+ description: "Total number of existing data streamer batches.");
+
+ /// <summary>
+ /// Data streamer items (rows) queued.
+ /// </summary>
+ public static readonly ObservableCounter<int> StreamerItemsQueued =
Meter.CreateObservableCounter(
+ name: "streamer-items-queued",
+ observeValue: () => Interlocked.CompareExchange(ref
_streamerItemsQueued, 0, 0),
+ unit: "items",
+ description: "Total number of queued data streamer items (rows).");
+
/// <summary>
/// Increments active connections.
/// </summary>
@@ -161,4 +199,25 @@ internal static class Metrics
/// Decrements active requests.
/// </summary>
public static void RequestsActiveDecrement() => Interlocked.Decrement(ref
_requestsActive);
+
+ /// <summary>
+ /// Increments active streamer batches.
+ /// </summary>
+ public static void StreamerBatchesActiveIncrement() =>
Interlocked.Increment(ref _streamerBatchesActive);
+
+ /// <summary>
+ /// Decrements active streamer batches.
+ /// </summary>
+ public static void StreamerBatchesActiveDecrement() =>
Interlocked.Decrement(ref _streamerBatchesActive);
+
+ /// <summary>
+ /// Increments streamer items queued.
+ /// </summary>
+ public static void StreamerItemsQueuedIncrement() =>
Interlocked.Increment(ref _streamerItemsQueued);
+
+ /// <summary>
+ /// Decrements streamer items queued.
+ /// </summary>
+ /// <param name="count">The count.</param>
+ public static void StreamerItemsQueuedDecrement(int count) =>
Interlocked.Add(ref _streamerItemsQueued, -count);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 9497885965..13b43ddb36 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -91,8 +91,11 @@ internal static class DataStreamer
await foreach (var item in
data.WithCancellation(cancellationToken))
{
- var (batch, partition) = Add(item);
+ // WithCancellation passes the token to the producer.
+ // However, not all producers support cancellation, so we need
to check it here as well.
+ cancellationToken.ThrowIfCancellationRequested();
+ var (batch, partition) = Add(item);
if (batch.Count >= options.BatchSize)
{
await SendAsync(batch, partition).ConfigureAwait(false);
@@ -121,6 +124,9 @@ internal static class DataStreamer
foreach (var batch in batches.Values)
{
batch.Buffer.Dispose();
+
+ Metrics.StreamerItemsQueuedDecrement(batch.Count);
+ Metrics.StreamerBatchesActiveDecrement();
}
}
@@ -163,6 +169,8 @@ internal static class DataStreamer
batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
}
+ Metrics.StreamerItemsQueuedIncrement();
+
return (batch, partition);
}
@@ -174,6 +182,8 @@ internal static class DataStreamer
{
batchRef = new Batch();
InitBuffer(batchRef);
+
+ Metrics.StreamerBatchesActiveIncrement();
}
return batchRef;
@@ -200,26 +210,34 @@ internal static class DataStreamer
buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
- batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task);
+ batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task, batch.Count);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
InitBuffer(batch);
batch.LastFlush = Stopwatch.GetTimestamp();
+
+ Metrics.StreamerBatchesActiveIncrement();
}
}
- async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string
partition, Task oldTask)
+ async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string
partition, Task oldTask, int count)
{
try
{
// Wait for the previous batch for this node to preserve item
order.
await oldTask.ConfigureAwait(false);
await sender(buf, partition,
retryPolicy).ConfigureAwait(false);
+
+ Metrics.StreamerBatchesSent.Add(1);
+ Metrics.StreamerItemsSent.Add(count);
}
finally
{
buf.Dispose();
+
+ Metrics.StreamerItemsQueuedDecrement(count);
+ Metrics.StreamerBatchesActiveDecrement();
}
}