This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 185e568d45 IGNITE-22031 .NET: Remove
DataStreamer.PartitionAssignmentUpdateFrequency (#3749)
185e568d45 is described below
commit 185e568d4536b686794dc1011d3f9ba561e91d7b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue May 14 15:19:59 2024 +0300
IGNITE-22031 .NET: Remove DataStreamer.PartitionAssignmentUpdateFrequency
(#3749)
1. Remove timer-based partition assignment check - is not necessary.
`Table.GetPartitionAssignmentAsync` is backed by a cache with a double-checked
locking, and uses a `ValueTask`, so there is no overhead in case of unchanged
assignment - we can call it as often as needed.
2. Add `TestDataStreamerReceivesPartitionAssignmentUpdatesWhileStreaming`
to cover this use case explicitly.
3. Reorder operations in `DataStreamer`: check partition assignment before
handling the next item, not after.
Benchmark results are not affected compared to `main` branch. Different
results are caused by previous changes (IGNITE-21931, IGNITE-21490).
---
.../Table/DataStreamerBenchmark.cs | 20 ++++-----
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 8 +++-
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 50 +++++++++++++++++++---
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 25 ++++-------
4 files changed, 68 insertions(+), 35 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
index cfa7ff566b..b2ec1798bc 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -33,20 +33,20 @@ using Tests;
/// inserts everything in one batch, and streamer sends multiple batches. With
multiple servers, streamer scales linearly because
/// it sends batches to different nodes in parallel.
/// <para />
-/// Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
+/// Results on i9-12900H, .NET SDK 6.0.421, Ubuntu 22.04:
/// | Method | ServerCount | Mean | Error | StdDev | Ratio
| RatioSD | Allocated |
/// |----------------- |------------
|----------:|---------:|---------:|------:|--------:|----------:|
-/// | DataStreamer | 1 | 141.56 ms | 2.725 ms | 3.244 ms | 1.00
| 0.00 | 4 MB |
-/// | UpsertAll | 1 | 112.99 ms | 1.203 ms | 1.125 ms | 0.80
| 0.02 | 4 MB |
-/// | UpsertAllBatched | 1 | 159.11 ms | 3.175 ms | 4.451 ms | 1.12
| 0.04 | 4 MB |
+/// | DataStreamer | 1 | 109.33 ms | 0.805 ms | 0.753 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 1 | 112.34 ms | 1.060 ms | 0.991 ms | 1.03
| 0.01 | 4 MB |
+/// | UpsertAllBatched | 1 | 158.85 ms | 3.115 ms | 5.374 ms | 1.44
| 0.06 | 4 MB |
/// | | | | | |
| | |
-/// | DataStreamer | 2 | 67.29 ms | 1.331 ms | 3.058 ms | 1.00
| 0.00 | 4 MB |
-/// | UpsertAll | 2 | 113.68 ms | 0.915 ms | 0.856 ms | 1.64
| 0.05 | 4 MB |
-/// | UpsertAllBatched | 2 | 162.47 ms | 3.169 ms | 5.118 ms | 2.42
| 0.14 | 4 MB |
+/// | DataStreamer | 2 | 56.03 ms | 0.619 ms | 0.579 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 2 | 112.38 ms | 1.527 ms | 1.428 ms | 2.01
| 0.03 | 4 MB |
+/// | UpsertAllBatched | 2 | 162.67 ms | 2.833 ms | 3.149 ms | 2.91
| 0.07 | 4 MB |
/// | | | | | |
| | |
-/// | DataStreamer | 4 | 32.64 ms | 0.507 ms | 0.475 ms | 1.00
| 0.00 | 4 MB |
-/// | UpsertAll | 4 | 113.84 ms | 1.276 ms | 1.193 ms | 3.49
| 0.05 | 4 MB |
-/// | UpsertAllBatched | 4 | 159.17 ms | 3.148 ms | 5.172 ms | 4.79
| 0.17 | 4 MB |.
+/// | DataStreamer | 4 | 43.86 ms | 0.528 ms | 0.494 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 4 | 113.32 ms | 0.880 ms | 0.823 ms | 2.58
| 0.04 | 4 MB |
+/// | UpsertAllBatched | 4 | 164.51 ms | 3.220 ms | 3.446 ms | 3.76
| 0.10 | 4 MB |.
/// </summary>
[MemoryDiagnoser]
public class DataStreamerBenchmark
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 68d696f94c..ee6020eae8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -346,7 +346,13 @@ namespace Apache.Ignite.Tests
case ClientOp.StreamerBatchSend:
reader.Skip(4);
- StreamerRowCount += reader.ReadInt32();
+ var batchSize = reader.ReadInt32();
+ StreamerRowCount += batchSize;
+
+ if (MultiRowOperationDelayPerRow > TimeSpan.Zero)
+ {
+ Thread.Sleep(MultiRowOperationDelayPerRow *
batchSize);
+ }
Send(handler, requestId, Array.Empty<byte>());
continue;
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index aa66a42d3b..2f2301fc1a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Tests;
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading.Channels;
using System.Threading.Tasks;
using Compute;
using Ignite.Compute;
@@ -121,6 +122,38 @@ public class PartitionAwarenessTests
(view, _) => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
ClientOp.StreamerBatchSend);
+ [Test]
+ public async Task
TestDataStreamerReceivesPartitionAssignmentUpdatesWhileStreaming()
+ {
+ var producer = Channel.CreateUnbounded<int>();
+
+ using var client = await GetClient();
+ var recordView = (await
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
+
+ var streamerTask =
recordView.StreamDataAsync(producer.Reader.ReadAllAsync(), new
DataStreamerOptions { PageSize = 1 });
+ Func<ITransaction?, Task> action = async _ =>
+ {
+ await producer.Writer.WriteAsync(1);
+ TestUtils.WaitForCondition(
+ () => new[] { _server1, _server2 }.SelectMany(x =>
x.ClientOps).Contains(ClientOp.StreamerBatchSend));
+ };
+
+ // Check default assignment.
+ await recordView.UpsertAsync(null, 1);
+ await AssertOpOnNode(action, ClientOp.StreamerBatchSend, _server2);
+
+ // Update assignment - first request receives update flag.
+ ReversePartitionAssignment();
+ await client.Tables.GetTablesAsync();
+
+ // Second request loads and uses new assignment.
+ await AssertOpOnNode(action, ClientOp.StreamerBatchSend, _server1,
allowExtraOps: true);
+
+ // End streaming.
+ producer.Writer.Complete();
+ await streamerTask;
+ }
+
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
public async Task TestAllRecordBinaryViewOperations(int keyId, int node)
@@ -427,7 +460,16 @@ public class PartitionAwarenessTests
await recordView.UpsertAsync(null, 1);
await AssertOpOnNode(tx => func(recordView, tx), op, _server2);
- // Update assignment.
+ // Update assignment - first request receives update flag.
+ ReversePartitionAssignment();
+ await client.Tables.GetTablesAsync();
+
+ // Second request loads and uses new assignment.
+ await AssertOpOnNode(tx => func(recordView, tx), op, _server1,
allowExtraOps: true);
+ }
+
+ private void ReversePartitionAssignment()
+ {
var assignmentTimestamp = DateTime.UtcNow.Ticks;
foreach (var server in new[] { _server1, _server2 })
@@ -436,12 +478,6 @@ public class PartitionAwarenessTests
server.PartitionAssignment =
server.PartitionAssignment.Reverse().ToArray();
server.PartitionAssignmentTimestamp = assignmentTimestamp;
}
-
- // First request receives update flag.
- await client.Tables.GetTablesAsync();
-
- // Second request loads and uses new assignment.
- await AssertOpOnNode(tx => func(recordView, tx), op, _server1,
allowExtraOps: true);
}
private async Task<IIgniteClient> GetClient()
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 82751b6d7b..fa12c9b433 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -47,8 +47,6 @@ using Serialization;
/// </summary>
internal static class DataStreamer
{
- private static readonly TimeSpan PartitionAssignmentUpdateFrequency =
TimeSpan.FromSeconds(15);
-
/// <summary>
/// Streams the data.
/// </summary>
@@ -93,7 +91,6 @@ internal static class DataStreamer
var partitionAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
var partitionCount = partitionAssignment.Length; // Can't be changed.
Debug.Assert(partitionCount > 0, "partitionCount > 0");
- var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
using var flushCts = new CancellationTokenSource();
@@ -107,24 +104,18 @@ internal static class DataStreamer
// However, not all producers support cancellation, so we need
to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
- var batch = await
AddWithRetryUnmapped(item).ConfigureAwait(false);
- if (batch.Count >= options.PageSize)
+ var newAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+ if (newAssignment != partitionAssignment)
{
- await SendAsync(batch).ConfigureAwait(false);
+ // Drain all batches to preserve order when partition
assignment changes.
+ await Drain().ConfigureAwait(false);
+ partitionAssignment = newAssignment;
}
- if (lastPartitionsAssignmentCheck.Elapsed >
PartitionAssignmentUpdateFrequency)
+ var batch = await
AddWithRetryUnmapped(item).ConfigureAwait(false);
+ if (batch.Count >= options.PageSize)
{
- var newAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
-
- if (newAssignment != partitionAssignment)
- {
- // Drain all batches to preserve order when partition
assignment changes.
- await Drain().ConfigureAwait(false);
- partitionAssignment = newAssignment;
- }
-
- lastPartitionsAssignmentCheck.Restart();
+ await SendAsync(batch).ConfigureAwait(false);
}
}