This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3ff291c4c IGNITE-21931 .NET: Refactor DataStreamer to use
StreamerBatchSend (#3546)
d3ff291c4c is described below
commit d3ff291c4c63694af9dcf3714ec983c2a20fcc21
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Apr 3 16:29:25 2024 +0300
IGNITE-21931 .NET: Refactor DataStreamer to use StreamerBatchSend (#3546)
* Refactor streamer to batch by partition instead of by node
* Use `StreamerBatchSend` instead of `TupleUpsertAll`
---
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 14 ++-
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 10 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 8 +-
.../Table/SchemaSynchronizationTest.cs | 2 +-
.../dotnet/Apache.Ignite/ClientOperationType.cs | 7 +-
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 5 +-
.../Internal/Proto/ClientOpExtensions.cs | 1 +
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 137 ++++++++++++++-------
.../Apache.Ignite/Internal/Table/RecordView.cs | 18 +--
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 1 +
10 files changed, 121 insertions(+), 82 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 4849b0d3e2..6dfe0f9ef9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -115,7 +115,7 @@ namespace Apache.Ignite.Tests
public Dictionary<string, object?> LastSqlScriptProps { get; private
set; } = new();
- public long UpsertAllRowCount { get; set; }
+ public long StreamerRowCount { get; set; }
public long DroppedConnectionCount { get; set; }
@@ -281,11 +281,6 @@ namespace Apache.Ignite.Tests
Thread.Sleep(MultiRowOperationDelayPerRow * count);
}
- if (opCode == ClientOp.TupleUpsertAll)
- {
- UpsertAllRowCount += count;
- }
-
Send(handler, requestId, new byte[] { 1, 0
}.AsMemory());
continue;
@@ -334,6 +329,13 @@ namespace Apache.Ignite.Tests
Thread.Sleep(HeartbeatDelay);
Send(handler, requestId, Array.Empty<byte>());
continue;
+
+ case ClientOp.StreamerBatchSend:
+ reader.Skip(4);
+ StreamerRowCount += reader.ReadInt32();
+
+ Send(handler, requestId, Array.Empty<byte>());
+ continue;
}
// Fake error message for any other op code.
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index c17bfa49a4..c3c7059aa7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -112,7 +112,7 @@ public class PartitionAwarenessTests
public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates(
view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
- ClientOp.TupleUpsertAll);
+ ClientOp.StreamerBatchSend);
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
@@ -138,7 +138,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new
IgniteTuple { ["ID"] = keyId + 1 } };
@@ -172,7 +172,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
@@ -218,7 +218,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs),
ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
- await AssertOpOnNode(() =>
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll,
expectedNode);
+ await AssertOpOnNode(() =>
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend,
expectedNode);
}
[Test]
@@ -248,7 +248,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => kvView.ContainsAsync(null, key),
ClientOp.TupleContainsKey, expectedNode);
await AssertOpOnNode(
() => kvView.StreamDataAsync(new[] { new KeyValuePair<int,
int>(key, val) }.ToAsyncEnumerable()),
- ClientOp.TupleUpsertAll,
+ ClientOp.StreamerBatchSend,
expectedNode);
// Multi-key operations use the first key for colocation.
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index d509c6fc9f..15b9e6aaeb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -149,7 +149,7 @@ public class DataStreamerTests : IgniteTestsBase
public async Task TestRetryLimitExhausted()
{
using var server = new FakeServer(
- shouldDropConnection: ctx => ctx is { OpCode:
ClientOp.TupleUpsertAll, RequestCount: > 7 });
+ shouldDropConnection: ctx => ctx is { OpCode:
ClientOp.StreamerBatchSend, RequestCount: > 7 });
using var client = await server.ConnectClientAsync();
var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
@@ -157,7 +157,7 @@ public class DataStreamerTests : IgniteTestsBase
var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
async () => await
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
- StringAssert.StartsWith("Operation TupleUpsertAll failed after 16
retries", ex!.Message);
+ StringAssert.StartsWith("Operation StreamerBatchSend failed after 16
retries", ex!.Message);
}
[Test]
@@ -167,7 +167,7 @@ public class DataStreamerTests : IgniteTestsBase
int upsertIdx = 0;
using var server = new FakeServer(
- shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll
&& Interlocked.Increment(ref upsertIdx) % 2 == 1);
+ shouldDropConnection: ctx => ctx.OpCode ==
ClientOp.StreamerBatchSend && Interlocked.Increment(ref upsertIdx) % 2 == 1);
// Streamer has it's own retry policy, so we can disable retries on
the client.
using var client = await server.ConnectClientAsync(new
IgniteClientConfiguration
@@ -178,7 +178,7 @@ public class DataStreamerTests : IgniteTestsBase
var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
await
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
- Assert.AreEqual(count, server.UpsertAllRowCount);
+ Assert.AreEqual(count, server.StreamerRowCount);
Assert.That(server.DroppedConnectionCount,
Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 0c9c64eae3..22ac2370d2 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -356,7 +356,7 @@ public class SchemaSynchronizationTest : IgniteTestsBase
async IAsyncEnumerable<IIgniteTuple> GetData()
{
// First set of batches uses old schema.
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < 20; i++)
{
yield return GetTuple(i);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 43e7935f98..0a1d3ee484 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -144,6 +144,11 @@ namespace Apache.Ignite
/// <summary>
/// Change compute job priority (<see
cref="IJobExecution{T}.ChangePriorityAsync"/>).
/// </summary>
- ComputeChangePriority
+ ComputeChangePriority,
+
+ /// <summary>
+ /// Send data streamer batch (<see
cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+ /// </summary>
+ StreamerBatchSend
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 80b3cecdfd..7a1e80428e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -128,6 +128,9 @@ namespace Apache.Ignite.Internal.Proto
ComputeCancel = 60,
/** Change compute job priority. */
- ComputeChangePriority = 61
+ ComputeChangePriority = 61,
+
+ /** Send streamer batch. */
+ StreamerBatchSend = 62
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index abab50c462..0648e0f580 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -68,6 +68,7 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.ClusterGetNodes => null,
ClientOp.PartitionAssignmentGet => null,
ClientOp.SqlParamMeta => null,
+ ClientOp.StreamerBatchSend =>
ClientOperationType.StreamerBatchSend,
// Do not return null from default arm intentionally so we
don't forget to update this when new ClientOp values are added.
// ReSharper disable once PatternIsRedundant
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index ec11ab3065..59655b0533 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,7 +22,6 @@ using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
-using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -54,20 +53,16 @@ internal static class DataStreamer
/// Streams the data.
/// </summary>
/// <param name="data">Data.</param>
- /// <param name="sender">Batch sender.</param>
+ /// <param name="table">Table.</param>
/// <param name="writer">Item writer.</param>
- /// <param name="schemaProvider">Schema provider.</param>
- /// <param name="partitionAssignmentProvider">Partitioner.</param>
/// <param name="options">Options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <typeparam name="T">Element type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<T> data,
- Func<PooledArrayBuffer, int, string, IRetryPolicy, Task> sender,
+ Table table,
RecordSerializer<T> writer,
- Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because
Tasks are cached.
- Func<ValueTask<string?[]>> partitionAssignmentProvider,
DataStreamerOptions options,
CancellationToken cancellationToken)
{
@@ -90,12 +85,16 @@ internal static class DataStreamer
// ConcurrentDictionary is not necessary because we consume the source
sequentially.
// However, locking for batches is required due to auto-flush
background task.
- var batches = new Dictionary<string, Batch<T>>();
+ var batches = new Dictionary<int, Batch<T>>();
var retryPolicy = new RetryLimitPolicy { RetryLimit =
options.RetryLimit };
- var schema = await schemaProvider(null).ConfigureAwait(false);
- var partitionAssignment = await
partitionAssignmentProvider().ConfigureAwait(false);
+ var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+
+ var partitionAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+ var partitionCount = partitionAssignment.Length; // Can't be changed.
+ Debug.Assert(partitionCount > 0, "partitionCount > 0");
var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+
using var flushCts = new CancellationTokenSource();
try
@@ -116,7 +115,7 @@ internal static class DataStreamer
if (lastPartitionsAssignmentCheck.Elapsed >
PartitionAssignmentUpdateFrequency)
{
- var newAssignment = await
partitionAssignmentProvider().ConfigureAwait(false);
+ var newAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
if (newAssignment != partitionAssignment)
{
@@ -146,7 +145,7 @@ internal static class DataStreamer
return;
- async ValueTask<(Batch<T> Batch, string Partition)>
AddWithRetryUnmapped(T item)
+ async ValueTask<(Batch<T> Batch, int Partition)>
AddWithRetryUnmapped(T item)
{
try
{
@@ -154,12 +153,12 @@ internal static class DataStreamer
}
catch (Exception e) when (e.CausedByUnmappedColumns())
{
- schema = await
schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+ schema = await
table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
return Add(item);
}
}
- (Batch<T> Batch, string Partition) Add(T item)
+ (Batch<T> Batch, int Partition) Add(T item)
{
var schema0 = schema;
var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length,
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
@@ -174,7 +173,7 @@ internal static class DataStreamer
}
}
- (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder, Schema schema0)
+ (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder, Schema schema0)
{
var columnCount = schema0.Columns.Length;
@@ -185,11 +184,8 @@ internal static class DataStreamer
writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly:
false, noValueSetRef);
- // ReSharper disable once AccessToModifiedClosure (reviewed)
- var partitionAssignment0 = partitionAssignment;
- var partition =
partitionAssignment0[Math.Abs(tupleBuilder.GetHash() %
partitionAssignment0.Length)] ?? string.Empty;
-
- var batch = GetOrCreateBatch(partition);
+ var partitionId = Math.Abs(tupleBuilder.GetHash() %
partitionCount);
+ var batch = GetOrCreateBatch(partitionId);
lock (batch)
{
@@ -214,17 +210,17 @@ internal static class DataStreamer
Metrics.StreamerItemsQueuedIncrement();
- return (batch, partition);
+ return (batch, partitionId);
}
- Batch<T> GetOrCreateBatch(string partition)
+ Batch<T> GetOrCreateBatch(int partitionId)
{
- ref var batchRef = ref
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+ ref var batchRef = ref
CollectionsMarshal.GetValueRefOrAddDefault(batches, partitionId, out _);
if (batchRef == null)
{
batchRef = new Batch<T>(options.PageSize, schema);
- InitBuffer(batchRef);
+ InitBuffer(batchRef, partitionId, schema);
Metrics.StreamerBatchesActiveIncrement();
}
@@ -232,7 +228,7 @@ internal static class DataStreamer
return batchRef;
}
- async Task SendAsync(Batch<T> batch, string partition)
+ async Task SendAsync(Batch<T> batch, int partitionId)
{
var expectedSize = batch.Count;
@@ -253,12 +249,12 @@ internal static class DataStreamer
buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
- batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+ batch.Task = SendAndDisposeBufAsync(buf, partitionId,
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
- InitBuffer(batch);
+ InitBuffer(batch, partitionId, schema);
batch.LastFlush = Stopwatch.GetTimestamp();
batch.Schema = schema;
batch.SchemaOutdated = false;
@@ -269,7 +265,7 @@ internal static class DataStreamer
async Task SendAndDisposeBufAsync(
PooledArrayBuffer buf,
- string partition,
+ int partitionId,
Task oldTask,
T[] items,
int count,
@@ -280,10 +276,13 @@ internal static class DataStreamer
if (batchSchemaOutdated)
{
// Schema update was detected while the batch was being filled.
- buf.Reset();
- writer.WriteMultiple(buf, null, schema, items.Take(count));
+ // Re-serialize the whole batch.
+ ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count),
writer);
}
+ // ReSharper disable once AccessToModifiedClosure
+ var preferredNode =
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+
try
{
int? schemaVersion = null;
@@ -296,17 +295,16 @@ internal static class DataStreamer
// Might be updated by another batch.
if (schema.Version != schemaVersion)
{
- schema = await
schemaProvider(schemaVersion).ConfigureAwait(false);
+ schema = await
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
}
// Serialize again with the new schema.
- buf.Reset();
- writer.WriteMultiple(buf, null, schema,
items.Take(count));
+ ReWriteBatch(buf, partitionId, schema,
items.AsSpan(0, count), writer);
}
// Wait for the previous batch for this node to
preserve item order.
await oldTask.ConfigureAwait(false);
- await sender(buf, count, partition,
retryPolicy).ConfigureAwait(false);
+ await SendBatchAsync(table, buf, count, preferredNode,
retryPolicy).ConfigureAwait(false);
return;
}
@@ -349,19 +347,6 @@ internal static class DataStreamer
}
}
- void InitBuffer(Batch<T> batch)
- {
- var buf = batch.Buffer;
-
- var w = buf.MessageWriter;
- w.Write(schema.TableId);
- w.WriteTx(null);
- w.Write(schema.Version);
-
- batch.CountPos = buf.Position;
- buf.Advance(5); // Reserve count.
- }
-
async Task Drain()
{
foreach (var (partition, batch) in batches)
@@ -376,6 +361,64 @@ internal static class DataStreamer
}
}
+ private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema
schema)
+ {
+ var buf = batch.Buffer;
+ WriteBatchHeader(buf, partitionId, schema);
+
+ batch.CountPos = buf.Position;
+ buf.Advance(5); // Reserve count.
+ }
+
+ private static void WriteBatchHeader(PooledArrayBuffer buf, int
partitionId, Schema schema)
+ {
+ var w = buf.MessageWriter;
+ w.Write(schema.TableId);
+ w.Write(partitionId);
+ w.WriteNil(); // Deleted rows bit set.
+ w.Write(schema.Version);
+ }
+
+ private static void ReWriteBatch<T>(
+ PooledArrayBuffer buf,
+ int partitionId,
+ Schema schema,
+ ReadOnlySpan<T> items,
+ RecordSerializer<T> writer)
+ {
+ buf.Reset();
+ WriteBatchHeader(buf, partitionId, schema);
+
+ var w = buf.MessageWriter;
+ w.Write(items.Length);
+
+ foreach (var item in items)
+ {
+ writer.Handler.Write(ref w, schema, item, keyOnly: false,
computeHash: false);
+ }
+ }
+
+ private static async Task SendBatchAsync(
+ Table table,
+ PooledArrayBuffer buf,
+ int count,
+ PreferredNode preferredNode,
+ IRetryPolicy retryPolicy)
+ {
+ var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.StreamerBatchSend,
+ tx: null,
+ buf,
+ preferredNode,
+ retryPolicy)
+ .ConfigureAwait(false);
+
+ resBuf.Dispose();
+
+ Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+ Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+ }
+
private sealed record Batch<T>
{
public Batch(int capacity, Schema schema)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 9f60de14fc..7e545ab151 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -299,24 +299,8 @@ namespace Apache.Ignite.Internal.Table
CancellationToken cancellationToken = default) =>
await DataStreamer.StreamDataAsync(
data,
- sender: async (batch, count, preferredNode, retryPolicy) =>
- {
- var (resBuf, socket) = await
_table.Socket.DoOutInOpAndGetSocketAsync(
- ClientOp.TupleUpsertAll,
- tx: null,
- batch,
- PreferredNode.FromName(preferredNode),
- retryPolicy)
- .ConfigureAwait(false);
-
- resBuf.Dispose();
-
- Metrics.StreamerBatchesSent.Add(1,
socket.MetricsContext.Tags);
- Metrics.StreamerItemsSent.Add(count,
socket.MetricsContext.Tags);
- },
+ _table,
writer: _ser,
- schemaProvider: _table.GetSchemaAsync,
- partitionAssignmentProvider: () =>
_table.GetPartitionAssignmentAsync(),
options ?? DataStreamerOptions.Default,
cancellationToken).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 41dcbcfa53..e364c23302 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -61,6 +61,7 @@ namespace Apache.Ignite
ClientOperationType.ComputeCancel => false,
ClientOperationType.ComputeChangePriority => false,
ClientOperationType.ComputeGetStatus => true,
+ ClientOperationType.StreamerBatchSend => false,
var unsupported => throw new
NotSupportedException("Unsupported operation type: " + unsupported)
};
}