This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d3ff291c4c IGNITE-21931 .NET: Refactor DataStreamer to use 
StreamerBatchSend (#3546)
d3ff291c4c is described below

commit d3ff291c4c63694af9dcf3714ec983c2a20fcc21
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Apr 3 16:29:25 2024 +0300

    IGNITE-21931 .NET: Refactor DataStreamer to use StreamerBatchSend (#3546)
    
    * Refactor streamer to batch by partition instead of by node
    * Use `StreamerBatchSend` instead of `TupleUpsertAll`
---
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  14 ++-
 .../Apache.Ignite.Tests/PartitionAwarenessTests.cs |  10 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs |   8 +-
 .../Table/SchemaSynchronizationTest.cs             |   2 +-
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |   7 +-
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |   5 +-
 .../Internal/Proto/ClientOpExtensions.cs           |   1 +
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 137 ++++++++++++++-------
 .../Apache.Ignite/Internal/Table/RecordView.cs     |  18 +--
 .../dotnet/Apache.Ignite/RetryReadPolicy.cs        |   1 +
 10 files changed, 121 insertions(+), 82 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 4849b0d3e2..6dfe0f9ef9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -115,7 +115,7 @@ namespace Apache.Ignite.Tests
 
         public Dictionary<string, object?> LastSqlScriptProps { get; private 
set; } = new();
 
-        public long UpsertAllRowCount { get; set; }
+        public long StreamerRowCount { get; set; }
 
         public long DroppedConnectionCount { get; set; }
 
@@ -281,11 +281,6 @@ namespace Apache.Ignite.Tests
                             Thread.Sleep(MultiRowOperationDelayPerRow * count);
                         }
 
-                        if (opCode == ClientOp.TupleUpsertAll)
-                        {
-                            UpsertAllRowCount += count;
-                        }
-
                         Send(handler, requestId, new byte[] { 1, 0 
}.AsMemory());
                         continue;
 
@@ -334,6 +329,13 @@ namespace Apache.Ignite.Tests
                         Thread.Sleep(HeartbeatDelay);
                         Send(handler, requestId, Array.Empty<byte>());
                         continue;
+
+                    case ClientOp.StreamerBatchSend:
+                        reader.Skip(4);
+                        StreamerRowCount += reader.ReadInt32();
+
+                        Send(handler, requestId, Array.Empty<byte>());
+                        continue;
                 }
 
                 // Fake error message for any other op code.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index c17bfa49a4..c3c7059aa7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -112,7 +112,7 @@ public class PartitionAwarenessTests
     public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
         await TestClientReceivesPartitionAssignmentUpdates(
             view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
-            ClientOp.TupleUpsertAll);
+            ClientOp.StreamerBatchSend);
 
     [Test]
     [TestCaseSource(nameof(KeyNodeCases))]
@@ -138,7 +138,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), 
ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), 
ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), 
ClientOp.TupleDeleteExact, expectedNode);
-        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new 
IgniteTuple { ["ID"] = keyId + 1 } };
@@ -172,7 +172,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), 
ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), 
ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), 
ClientOp.TupleDeleteExact, expectedNode);
-        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, key - 1, key + 1 };
@@ -218,7 +218,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs), 
ClientOp.TupleUpsertAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys), 
ClientOp.TupleDeleteAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs), 
ClientOp.TupleDeleteAllExact, expectedNode);
-        await AssertOpOnNode(() => 
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, 
expectedNode);
+        await AssertOpOnNode(() => 
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, 
expectedNode);
     }
 
     [Test]
@@ -248,7 +248,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => kvView.ContainsAsync(null, key), 
ClientOp.TupleContainsKey, expectedNode);
         await AssertOpOnNode(
             () => kvView.StreamDataAsync(new[] { new KeyValuePair<int, 
int>(key, val) }.ToAsyncEnumerable()),
-            ClientOp.TupleUpsertAll,
+            ClientOp.StreamerBatchSend,
             expectedNode);
 
         // Multi-key operations use the first key for colocation.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index d509c6fc9f..15b9e6aaeb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -149,7 +149,7 @@ public class DataStreamerTests : IgniteTestsBase
     public async Task TestRetryLimitExhausted()
     {
         using var server = new FakeServer(
-            shouldDropConnection: ctx => ctx is { OpCode: 
ClientOp.TupleUpsertAll, RequestCount: > 7 });
+            shouldDropConnection: ctx => ctx is { OpCode: 
ClientOp.StreamerBatchSend, RequestCount: > 7 });
 
         using var client = await server.ConnectClientAsync();
         var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
@@ -157,7 +157,7 @@ public class DataStreamerTests : IgniteTestsBase
         var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
             async () => await 
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
 
-        StringAssert.StartsWith("Operation TupleUpsertAll failed after 16 
retries", ex!.Message);
+        StringAssert.StartsWith("Operation StreamerBatchSend failed after 16 
retries", ex!.Message);
     }
 
     [Test]
@@ -167,7 +167,7 @@ public class DataStreamerTests : IgniteTestsBase
         int upsertIdx = 0;
 
         using var server = new FakeServer(
-            shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll 
&& Interlocked.Increment(ref upsertIdx) % 2 == 1);
+            shouldDropConnection: ctx => ctx.OpCode == 
ClientOp.StreamerBatchSend && Interlocked.Increment(ref upsertIdx) % 2 == 1);
 
         // Streamer has it's own retry policy, so we can disable retries on 
the client.
         using var client = await server.ConnectClientAsync(new 
IgniteClientConfiguration
@@ -178,7 +178,7 @@ public class DataStreamerTests : IgniteTestsBase
         var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
         await 
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
 
-        Assert.AreEqual(count, server.UpsertAllRowCount);
+        Assert.AreEqual(count, server.StreamerRowCount);
         Assert.That(server.DroppedConnectionCount, 
Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
     }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 0c9c64eae3..22ac2370d2 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -356,7 +356,7 @@ public class SchemaSynchronizationTest : IgniteTestsBase
         async IAsyncEnumerable<IIgniteTuple> GetData()
         {
             // First set of batches uses old schema.
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < 20; i++)
             {
                 yield return GetTuple(i);
             }
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 43e7935f98..0a1d3ee484 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -144,6 +144,11 @@ namespace Apache.Ignite
         /// <summary>
         /// Change compute job priority (<see 
cref="IJobExecution{T}.ChangePriorityAsync"/>).
         /// </summary>
-        ComputeChangePriority
+        ComputeChangePriority,
+
+        /// <summary>
+        /// Send data streamer batch (<see 
cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+        /// </summary>
+        StreamerBatchSend
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 80b3cecdfd..7a1e80428e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -128,6 +128,9 @@ namespace Apache.Ignite.Internal.Proto
         ComputeCancel = 60,
 
         /** Change compute job priority. */
-        ComputeChangePriority = 61
+        ComputeChangePriority = 61,
+
+        /** Send streamer batch. */
+        StreamerBatchSend = 62
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index abab50c462..0648e0f580 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -68,6 +68,7 @@ namespace Apache.Ignite.Internal.Proto
                 ClientOp.ClusterGetNodes => null,
                 ClientOp.PartitionAssignmentGet => null,
                 ClientOp.SqlParamMeta => null,
+                ClientOp.StreamerBatchSend => 
ClientOperationType.StreamerBatchSend,
 
                 // Do not return null from default arm intentionally so we 
don't forget to update this when new ClientOp values are added.
                 // ReSharper disable once PatternIsRedundant
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index ec11ab3065..59655b0533 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,7 +22,6 @@ using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
-using System.Linq;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -54,20 +53,16 @@ internal static class DataStreamer
     /// Streams the data.
     /// </summary>
     /// <param name="data">Data.</param>
-    /// <param name="sender">Batch sender.</param>
+    /// <param name="table">Table.</param>
     /// <param name="writer">Item writer.</param>
-    /// <param name="schemaProvider">Schema provider.</param>
-    /// <param name="partitionAssignmentProvider">Partitioner.</param>
     /// <param name="options">Options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
     /// <typeparam name="T">Element type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
     internal static async Task StreamDataAsync<T>(
         IAsyncEnumerable<T> data,
-        Func<PooledArrayBuffer, int, string, IRetryPolicy, Task> sender,
+        Table table,
         RecordSerializer<T> writer,
-        Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because 
Tasks are cached.
-        Func<ValueTask<string?[]>> partitionAssignmentProvider,
         DataStreamerOptions options,
         CancellationToken cancellationToken)
     {
@@ -90,12 +85,16 @@ internal static class DataStreamer
 
         // ConcurrentDictionary is not necessary because we consume the source 
sequentially.
         // However, locking for batches is required due to auto-flush 
background task.
-        var batches = new Dictionary<string, Batch<T>>();
+        var batches = new Dictionary<int, Batch<T>>();
         var retryPolicy = new RetryLimitPolicy { RetryLimit = 
options.RetryLimit };
 
-        var schema = await schemaProvider(null).ConfigureAwait(false);
-        var partitionAssignment = await 
partitionAssignmentProvider().ConfigureAwait(false);
+        var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+
+        var partitionAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+        var partitionCount = partitionAssignment.Length; // Can't be changed.
+        Debug.Assert(partitionCount > 0, "partitionCount > 0");
         var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+
         using var flushCts = new CancellationTokenSource();
 
         try
@@ -116,7 +115,7 @@ internal static class DataStreamer
 
                 if (lastPartitionsAssignmentCheck.Elapsed > 
PartitionAssignmentUpdateFrequency)
                 {
-                    var newAssignment = await 
partitionAssignmentProvider().ConfigureAwait(false);
+                    var newAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
 
                     if (newAssignment != partitionAssignment)
                     {
@@ -146,7 +145,7 @@ internal static class DataStreamer
 
         return;
 
-        async ValueTask<(Batch<T> Batch, string Partition)> 
AddWithRetryUnmapped(T item)
+        async ValueTask<(Batch<T> Batch, int Partition)> 
AddWithRetryUnmapped(T item)
         {
             try
             {
@@ -154,12 +153,12 @@ internal static class DataStreamer
             }
             catch (Exception e) when (e.CausedByUnmappedColumns())
             {
-                schema = await 
schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+                schema = await 
table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
                 return Add(item);
             }
         }
 
-        (Batch<T> Batch, string Partition) Add(T item)
+        (Batch<T> Batch, int Partition) Add(T item)
         {
             var schema0 = schema;
             var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length, 
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
@@ -174,7 +173,7 @@ internal static class DataStreamer
             }
         }
 
-        (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder, Schema schema0)
+        (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder, Schema schema0)
         {
             var columnCount = schema0.Columns.Length;
 
@@ -185,11 +184,8 @@ internal static class DataStreamer
 
             writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly: 
false, noValueSetRef);
 
-            // ReSharper disable once AccessToModifiedClosure (reviewed)
-            var partitionAssignment0 = partitionAssignment;
-            var partition = 
partitionAssignment0[Math.Abs(tupleBuilder.GetHash() % 
partitionAssignment0.Length)] ?? string.Empty;
-
-            var batch = GetOrCreateBatch(partition);
+            var partitionId = Math.Abs(tupleBuilder.GetHash() % 
partitionCount);
+            var batch = GetOrCreateBatch(partitionId);
 
             lock (batch)
             {
@@ -214,17 +210,17 @@ internal static class DataStreamer
 
             Metrics.StreamerItemsQueuedIncrement();
 
-            return (batch, partition);
+            return (batch, partitionId);
         }
 
-        Batch<T> GetOrCreateBatch(string partition)
+        Batch<T> GetOrCreateBatch(int partitionId)
         {
-            ref var batchRef = ref 
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+            ref var batchRef = ref 
CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
 
             if (batchRef == null)
             {
                 batchRef = new Batch<T>(options.PageSize, schema);
-                InitBuffer(batchRef);
+                InitBuffer(batchRef, partitionId, schema);
 
                 Metrics.StreamerBatchesActiveIncrement();
             }
@@ -232,7 +228,7 @@ internal static class DataStreamer
             return batchRef;
         }
 
-        async Task SendAsync(Batch<T> batch, string partition)
+        async Task SendAsync(Batch<T> batch, int partitionId)
         {
             var expectedSize = batch.Count;
 
@@ -253,12 +249,12 @@ internal static class DataStreamer
                 buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
                 buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
 
-                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+                batch.Task = SendAndDisposeBufAsync(buf, partitionId, 
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
 
                 batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
                 batch.Count = 0;
                 batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
-                InitBuffer(batch);
+                InitBuffer(batch, partitionId, schema);
                 batch.LastFlush = Stopwatch.GetTimestamp();
                 batch.Schema = schema;
                 batch.SchemaOutdated = false;
@@ -269,7 +265,7 @@ internal static class DataStreamer
 
         async Task SendAndDisposeBufAsync(
             PooledArrayBuffer buf,
-            string partition,
+            int partitionId,
             Task oldTask,
             T[] items,
             int count,
@@ -280,10 +276,13 @@ internal static class DataStreamer
             if (batchSchemaOutdated)
             {
                 // Schema update was detected while the batch was being filled.
-                buf.Reset();
-                writer.WriteMultiple(buf, null, schema, items.Take(count));
+                // Re-serialize the whole batch.
+                ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), 
writer);
             }
 
+            // ReSharper disable once AccessToModifiedClosure
+            var preferredNode = 
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+
             try
             {
                 int? schemaVersion = null;
@@ -296,17 +295,16 @@ internal static class DataStreamer
                             // Might be updated by another batch.
                             if (schema.Version != schemaVersion)
                             {
-                                schema = await 
schemaProvider(schemaVersion).ConfigureAwait(false);
+                                schema = await 
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
                             }
 
                             // Serialize again with the new schema.
-                            buf.Reset();
-                            writer.WriteMultiple(buf, null, schema, 
items.Take(count));
+                            ReWriteBatch(buf, partitionId, schema, 
items.AsSpan(0, count), writer);
                         }
 
                         // Wait for the previous batch for this node to 
preserve item order.
                         await oldTask.ConfigureAwait(false);
-                        await sender(buf, count, partition, 
retryPolicy).ConfigureAwait(false);
+                        await SendBatchAsync(table, buf, count, preferredNode, 
retryPolicy).ConfigureAwait(false);
 
                         return;
                     }
@@ -349,19 +347,6 @@ internal static class DataStreamer
             }
         }
 
-        void InitBuffer(Batch<T> batch)
-        {
-            var buf = batch.Buffer;
-
-            var w = buf.MessageWriter;
-            w.Write(schema.TableId);
-            w.WriteTx(null);
-            w.Write(schema.Version);
-
-            batch.CountPos = buf.Position;
-            buf.Advance(5); // Reserve count.
-        }
-
         async Task Drain()
         {
             foreach (var (partition, batch) in batches)
@@ -376,6 +361,64 @@ internal static class DataStreamer
         }
     }
 
+    private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema 
schema)
+    {
+        var buf = batch.Buffer;
+        WriteBatchHeader(buf, partitionId, schema);
+
+        batch.CountPos = buf.Position;
+        buf.Advance(5); // Reserve count.
+    }
+
+    private static void WriteBatchHeader(PooledArrayBuffer buf, int 
partitionId, Schema schema)
+    {
+        var w = buf.MessageWriter;
+        w.Write(schema.TableId);
+        w.Write(partitionId);
+        w.WriteNil(); // Deleted rows bit set.
+        w.Write(schema.Version);
+    }
+
+    private static void ReWriteBatch<T>(
+        PooledArrayBuffer buf,
+        int partitionId,
+        Schema schema,
+        ReadOnlySpan<T> items,
+        RecordSerializer<T> writer)
+    {
+        buf.Reset();
+        WriteBatchHeader(buf, partitionId, schema);
+
+        var w = buf.MessageWriter;
+        w.Write(items.Length);
+
+        foreach (var item in items)
+        {
+            writer.Handler.Write(ref w, schema, item, keyOnly: false, 
computeHash: false);
+        }
+    }
+
+    private static async Task SendBatchAsync(
+        Table table,
+        PooledArrayBuffer buf,
+        int count,
+        PreferredNode preferredNode,
+        IRetryPolicy retryPolicy)
+    {
+        var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
+                ClientOp.StreamerBatchSend,
+                tx: null,
+                buf,
+                preferredNode,
+                retryPolicy)
+            .ConfigureAwait(false);
+
+        resBuf.Dispose();
+
+        Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+        Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+    }
+
     private sealed record Batch<T>
     {
         public Batch(int capacity, Schema schema)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 9f60de14fc..7e545ab151 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -299,24 +299,8 @@ namespace Apache.Ignite.Internal.Table
             CancellationToken cancellationToken = default) =>
             await DataStreamer.StreamDataAsync(
                 data,
-                sender: async (batch, count, preferredNode, retryPolicy) =>
-                {
-                    var (resBuf, socket) = await 
_table.Socket.DoOutInOpAndGetSocketAsync(
-                            ClientOp.TupleUpsertAll,
-                            tx: null,
-                            batch,
-                            PreferredNode.FromName(preferredNode),
-                            retryPolicy)
-                        .ConfigureAwait(false);
-
-                    resBuf.Dispose();
-
-                    Metrics.StreamerBatchesSent.Add(1, 
socket.MetricsContext.Tags);
-                    Metrics.StreamerItemsSent.Add(count, 
socket.MetricsContext.Tags);
-                },
+                _table,
                 writer: _ser,
-                schemaProvider: _table.GetSchemaAsync,
-                partitionAssignmentProvider: () => 
_table.GetPartitionAssignmentAsync(),
                 options ?? DataStreamerOptions.Default,
                 cancellationToken).ConfigureAwait(false);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs 
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 41dcbcfa53..e364c23302 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -61,6 +61,7 @@ namespace Apache.Ignite
                 ClientOperationType.ComputeCancel => false,
                 ClientOperationType.ComputeChangePriority => false,
                 ClientOperationType.ComputeGetStatus => true,
+                ClientOperationType.StreamerBatchSend => false,
                 var unsupported => throw new 
NotSupportedException("Unsupported operation type: " + unsupported)
             };
         }

Reply via email to