This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 185e568d45 IGNITE-22031 .NET: Remove 
DataStreamer.PartitionAssignmentUpdateFrequency (#3749)
185e568d45 is described below

commit 185e568d4536b686794dc1011d3f9ba561e91d7b
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue May 14 15:19:59 2024 +0300

    IGNITE-22031 .NET: Remove DataStreamer.PartitionAssignmentUpdateFrequency 
(#3749)
    
    1. Remove timer-based partition assignment check - is not necessary. 
`Table.GetPartitionAssignmentAsync` is backed by a cache with a double-checked 
locking, and uses a `ValueTask`, so there is no overhead in case of unchanged 
assignment - we can call it as often as needed.
    
    2. Add `TestDataStreamerReceivesPartitionAssignmentUpdatesWhileStreaming` 
to cover this use case explicitly.
    
    3. Reorder operations in `DataStreamer`: check partition assignment before 
handling the next item, not after.
    
    
    Benchmark results are not affected compared to `main` branch. Different 
results are caused by previous changes (IGNITE-21931, IGNITE-21490).
---
 .../Table/DataStreamerBenchmark.cs                 | 20 ++++-----
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  8 +++-
 .../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 50 +++++++++++++++++++---
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 25 ++++-------
 4 files changed, 68 insertions(+), 35 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
index cfa7ff566b..b2ec1798bc 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -33,20 +33,20 @@ using Tests;
 /// inserts everything in one batch, and streamer sends multiple batches. With 
multiple servers, streamer scales linearly because
 /// it sends batches to different nodes in parallel.
 /// <para />
-/// Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
+/// Results on i9-12900H, .NET SDK 6.0.421, Ubuntu 22.04:
 /// |           Method | ServerCount |      Mean |    Error |   StdDev | Ratio 
| RatioSD | Allocated |
 /// |----------------- |------------ 
|----------:|---------:|---------:|------:|--------:|----------:|
-/// |     DataStreamer |           1 | 141.56 ms | 2.725 ms | 3.244 ms |  1.00 
|    0.00 |      4 MB |
-/// |        UpsertAll |           1 | 112.99 ms | 1.203 ms | 1.125 ms |  0.80 
|    0.02 |      4 MB |
-/// | UpsertAllBatched |           1 | 159.11 ms | 3.175 ms | 4.451 ms |  1.12 
|    0.04 |      4 MB |
+/// |     DataStreamer |           1 | 109.33 ms | 0.805 ms | 0.753 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           1 | 112.34 ms | 1.060 ms | 0.991 ms |  1.03 
|    0.01 |      4 MB |
+/// | UpsertAllBatched |           1 | 158.85 ms | 3.115 ms | 5.374 ms |  1.44 
|    0.06 |      4 MB |
 /// |                  |             |           |          |          |       
|         |           |
-/// |     DataStreamer |           2 |  67.29 ms | 1.331 ms | 3.058 ms |  1.00 
|    0.00 |      4 MB |
-/// |        UpsertAll |           2 | 113.68 ms | 0.915 ms | 0.856 ms |  1.64 
|    0.05 |      4 MB |
-/// | UpsertAllBatched |           2 | 162.47 ms | 3.169 ms | 5.118 ms |  2.42 
|    0.14 |      4 MB |
+/// |     DataStreamer |           2 |  56.03 ms | 0.619 ms | 0.579 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           2 | 112.38 ms | 1.527 ms | 1.428 ms |  2.01 
|    0.03 |      4 MB |
+/// | UpsertAllBatched |           2 | 162.67 ms | 2.833 ms | 3.149 ms |  2.91 
|    0.07 |      4 MB |
 /// |                  |             |           |          |          |       
|         |           |
-/// |     DataStreamer |           4 |  32.64 ms | 0.507 ms | 0.475 ms |  1.00 
|    0.00 |      4 MB |
-/// |        UpsertAll |           4 | 113.84 ms | 1.276 ms | 1.193 ms |  3.49 
|    0.05 |      4 MB |
-/// | UpsertAllBatched |           4 | 159.17 ms | 3.148 ms | 5.172 ms |  4.79 
|    0.17 |      4 MB |.
+/// |     DataStreamer |           4 |  43.86 ms | 0.528 ms | 0.494 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           4 | 113.32 ms | 0.880 ms | 0.823 ms |  2.58 
|    0.04 |      4 MB |
+/// | UpsertAllBatched |           4 | 164.51 ms | 3.220 ms | 3.446 ms |  3.76 
|    0.10 |      4 MB |.
 /// </summary>
 [MemoryDiagnoser]
 public class DataStreamerBenchmark
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 68d696f94c..ee6020eae8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -346,7 +346,13 @@ namespace Apache.Ignite.Tests
 
                     case ClientOp.StreamerBatchSend:
                         reader.Skip(4);
-                        StreamerRowCount += reader.ReadInt32();
+                        var batchSize = reader.ReadInt32();
+                        StreamerRowCount += batchSize;
+
+                        if (MultiRowOperationDelayPerRow > TimeSpan.Zero)
+                        {
+                            Thread.Sleep(MultiRowOperationDelayPerRow * 
batchSize);
+                        }
 
                         Send(handler, requestId, Array.Empty<byte>());
                         continue;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index aa66a42d3b..2f2301fc1a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Tests;
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading.Channels;
 using System.Threading.Tasks;
 using Compute;
 using Ignite.Compute;
@@ -121,6 +122,38 @@ public class PartitionAwarenessTests
             (view, _) => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
             ClientOp.StreamerBatchSend);
 
+    [Test]
+    public async Task 
TestDataStreamerReceivesPartitionAssignmentUpdatesWhileStreaming()
+    {
+        var producer = Channel.CreateUnbounded<int>();
+
+        using var client = await GetClient();
+        var recordView = (await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
+
+        var streamerTask = 
recordView.StreamDataAsync(producer.Reader.ReadAllAsync(), new 
DataStreamerOptions { PageSize = 1 });
+        Func<ITransaction?, Task> action = async _ =>
+        {
+             await producer.Writer.WriteAsync(1);
+             TestUtils.WaitForCondition(
+                 () => new[] { _server1, _server2 }.SelectMany(x => 
x.ClientOps).Contains(ClientOp.StreamerBatchSend));
+        };
+
+        // Check default assignment.
+        await recordView.UpsertAsync(null, 1);
+        await AssertOpOnNode(action, ClientOp.StreamerBatchSend, _server2);
+
+        // Update assignment - first request receives update flag.
+        ReversePartitionAssignment();
+        await client.Tables.GetTablesAsync();
+
+        // Second request loads and uses new assignment.
+        await AssertOpOnNode(action, ClientOp.StreamerBatchSend, _server1, 
allowExtraOps: true);
+
+        // End streaming.
+        producer.Writer.Complete();
+        await streamerTask;
+    }
+
     [Test]
     [TestCaseSource(nameof(KeyNodeCases))]
     public async Task TestAllRecordBinaryViewOperations(int keyId, int node)
@@ -427,7 +460,16 @@ public class PartitionAwarenessTests
         await recordView.UpsertAsync(null, 1);
         await AssertOpOnNode(tx => func(recordView, tx), op, _server2);
 
-        // Update assignment.
+        // Update assignment - first request receives update flag.
+        ReversePartitionAssignment();
+        await client.Tables.GetTablesAsync();
+
+        // Second request loads and uses new assignment.
+        await AssertOpOnNode(tx => func(recordView, tx), op, _server1, 
allowExtraOps: true);
+    }
+
+    private void ReversePartitionAssignment()
+    {
         var assignmentTimestamp = DateTime.UtcNow.Ticks;
 
         foreach (var server in new[] { _server1, _server2 })
@@ -436,12 +478,6 @@ public class PartitionAwarenessTests
             server.PartitionAssignment = 
server.PartitionAssignment.Reverse().ToArray();
             server.PartitionAssignmentTimestamp = assignmentTimestamp;
         }
-
-        // First request receives update flag.
-        await client.Tables.GetTablesAsync();
-
-        // Second request loads and uses new assignment.
-        await AssertOpOnNode(tx => func(recordView, tx), op, _server1, 
allowExtraOps: true);
     }
 
     private async Task<IIgniteClient> GetClient()
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 82751b6d7b..fa12c9b433 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -47,8 +47,6 @@ using Serialization;
 /// </summary>
 internal static class DataStreamer
 {
-    private static readonly TimeSpan PartitionAssignmentUpdateFrequency = 
TimeSpan.FromSeconds(15);
-
     /// <summary>
     /// Streams the data.
     /// </summary>
@@ -93,7 +91,6 @@ internal static class DataStreamer
         var partitionAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
         var partitionCount = partitionAssignment.Length; // Can't be changed.
         Debug.Assert(partitionCount > 0, "partitionCount > 0");
-        var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
 
         using var flushCts = new CancellationTokenSource();
 
@@ -107,24 +104,18 @@ internal static class DataStreamer
                 // However, not all producers support cancellation, so we need 
to check it here as well.
                 cancellationToken.ThrowIfCancellationRequested();
 
-                var batch = await 
AddWithRetryUnmapped(item).ConfigureAwait(false);
-                if (batch.Count >= options.PageSize)
+                var newAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+                if (newAssignment != partitionAssignment)
                 {
-                    await SendAsync(batch).ConfigureAwait(false);
+                    // Drain all batches to preserve order when partition 
assignment changes.
+                    await Drain().ConfigureAwait(false);
+                    partitionAssignment = newAssignment;
                 }
 
-                if (lastPartitionsAssignmentCheck.Elapsed > 
PartitionAssignmentUpdateFrequency)
+                var batch = await 
AddWithRetryUnmapped(item).ConfigureAwait(false);
+                if (batch.Count >= options.PageSize)
                 {
-                    var newAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
-
-                    if (newAssignment != partitionAssignment)
-                    {
-                        // Drain all batches to preserve order when partition 
assignment changes.
-                        await Drain().ConfigureAwait(false);
-                        partitionAssignment = newAssignment;
-                    }
-
-                    lastPartitionsAssignmentCheck.Restart();
+                    await SendAsync(batch).ConfigureAwait(false);
                 }
             }
 

Reply via email to