This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 79a841d952 IGNITE-19545 .NET: Add Basic Data Streamer (#2182)
79a841d952 is described below

commit 79a841d95247c338b80ab91a93ec18ee7c0344ac
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jun 13 15:02:01 2023 +0300

    IGNITE-19545 .NET: Add Basic Data Streamer (#2182)
    
    Add basic data streamer (without receiver) to .NET client (Record and KV 
views).
    
    This covers the first use case from 
https://cwiki.apache.org/confluence/display/IGNITE/IEP-102%3A+Data+Streamer.
    
    * No changes to client protocol: `UPSERT_ALL` is used to upsert batches.
    * Hashing is combined with serialization (unlike Java client), so we write 
binary tuples to a per-node buffer right away.
       - Pros: cheaper happy path;
       - Cons: will require re-serialization on schema update.
     * Iteration and serialization are sequential.
     * Batches are sent asynchronously; we wait for the previous batch only 
when the next batch for the given node is full.
     * The more connections to different servers we have - the more parallelism 
we get (see benchmark).
     * There is no parallelism for the same node, because we need to guarantee 
ordering.
     * Extra: some hashing refactoring (initially I wanted to separate hashing 
from serialization, which turned out to be complicated and less performant).
    
    **Benchmark: upsert 100K rows** (1 ms per 1000 rows delay on server)
    `FakeServer` imitates work by doing `Thread.Sleep` based on the batch size, 
so `UpsertAll` wins in single-server scenario because it inserts everything in 
one batch (while streamer sends 100 batches). With multiple servers, streamer 
scales linearly and wins - because it sends batches to different nodes in 
parallel.
    
    ```
    Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
    |           Method | ServerCount |      Mean |    Error |   StdDev | Ratio 
| RatioSD | Allocated |
    |----------------- |------------ 
|----------:|---------:|---------:|------:|--------:|----------:|
    |     DataStreamer |           1 | 141.56 ms | 2.725 ms | 3.244 ms |  1.00 
|    0.00 |      4 MB |
    |        UpsertAll |           1 | 112.99 ms | 1.203 ms | 1.125 ms |  0.80 
|    0.02 |      4 MB |
    | UpsertAllBatched |           1 | 159.11 ms | 3.175 ms | 4.451 ms |  1.12 
|    0.04 |      4 MB |
    |                  |             |           |          |          |       
|         |           |
    |     DataStreamer |           2 |  67.29 ms | 1.331 ms | 3.058 ms |  1.00 
|    0.00 |      4 MB |
    |        UpsertAll |           2 | 113.68 ms | 0.915 ms | 0.856 ms |  1.64 
|    0.05 |      4 MB |
    | UpsertAllBatched |           2 | 162.47 ms | 3.169 ms | 5.118 ms |  2.42 
|    0.14 |      4 MB |
    |                  |             |           |          |          |       
|         |           |
    |     DataStreamer |           4 |  32.64 ms | 0.507 ms | 0.475 ms |  1.00 
|    0.00 |      4 MB |
    |        UpsertAll |           4 | 113.84 ms | 1.276 ms | 1.193 ms |  3.49 
|    0.05 |      4 MB |
    | UpsertAllBatched |           4 | 159.17 ms | 3.148 ms | 5.172 ms |  4.79 
|    0.17 |      4 MB |.
    ```
---
 .../dotnet/Apache.Ignite.Benchmarks/Program.cs     |   8 +-
 .../Table/DataStreamerBenchmark.cs                 | 127 ++++++++++
 .../SerializerHandlerBenchmarksBase.cs             |   2 +-
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  26 ++
 .../dotnet/Apache.Ignite.Tests/HeartbeatTests.cs   |   6 +-
 .../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs  |   6 +-
 .../dotnet/Apache.Ignite.Tests/LoggingTests.cs     |   4 +-
 .../Apache.Ignite.Tests/PartitionAwarenessTests.cs |  65 +++--
 .../Proto/ColocationHashTests.cs                   |   2 +-
 .../dotnet/Apache.Ignite.Tests/SslTests.cs         |   2 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 208 +++++++++++++++
 .../Serialization/ObjectSerializerHandlerTests.cs  |   2 +-
 .../Transactions/TransactionsTests.cs              |   9 +-
 .../Internal/Buffers/PooledArrayBuffer.cs          |  12 +-
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs |  19 +-
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |   3 +-
 .../Internal/Common/IgniteArgumentCheck.cs         |  10 +-
 .../Proto/BinaryTuple/BinaryTupleBuilder.cs        |  73 +-----
 .../Proto/BinaryTuple/BinaryTupleCommon.cs         |  42 +++
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 282 +++++++++++++++++++++
 .../Apache.Ignite/Internal/Table/KeyValueView.cs   |  16 ++
 .../Apache.Ignite/Internal/Table/RecordView.cs     |  30 ++-
 .../dotnet/Apache.Ignite/Internal/Table/Schema.cs  |   2 +
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |  42 +--
 .../Apache.Ignite/Internal/Table/TemporalTypes.cs  |  20 ++
 .../Apache.Ignite/Table/DataStreamerOptions.cs     |  48 ++++
 .../Apache.Ignite/Table/IDataStreamerTarget.cs     |  39 +++
 .../dotnet/Apache.Ignite/Table/IKeyValueView.cs    |   2 +-
 .../dotnet/Apache.Ignite/Table/IRecordView.cs      |   2 +-
 29 files changed, 964 insertions(+), 145 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
index 50b2e2bfb0..0c256c1385 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
@@ -17,12 +17,10 @@
 
 namespace Apache.Ignite.Benchmarks;
 
-using System.Threading.Tasks;
+using BenchmarkDotNet.Running;
+using Table;
 
 internal static class Program
 {
-    private static async Task Main()
-    {
-        await ManyConnectionsBenchmark.RunAsync();
-    }
+    private static void Main() => BenchmarkRunner.Run<DataStreamerBenchmark>();
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
new file mode 100644
index 0000000000..7208629dd1
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Benchmarks.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+using Ignite.Table;
+using Tests;
+
+/// <summary>
+/// Data streamer benchmark.
+/// <para />
+/// Server imitates work by doing Thread.Sleep based on the batch size, so 
UpsertAll wins in single-server scenario because it
+/// inserts everything in one batch, and streamer sends multiple batches. With 
multiple servers, streamer scales linearly because
+/// it sends batches to different nodes in parallel.
+/// <para />
+/// Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
+/// |           Method | ServerCount |      Mean |    Error |   StdDev | Ratio 
| RatioSD | Allocated |
+/// |----------------- |------------ 
|----------:|---------:|---------:|------:|--------:|----------:|
+/// |     DataStreamer |           1 | 141.56 ms | 2.725 ms | 3.244 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           1 | 112.99 ms | 1.203 ms | 1.125 ms |  0.80 
|    0.02 |      4 MB |
+/// | UpsertAllBatched |           1 | 159.11 ms | 3.175 ms | 4.451 ms |  1.12 
|    0.04 |      4 MB |
+/// |                  |             |           |          |          |       
|         |           |
+/// |     DataStreamer |           2 |  67.29 ms | 1.331 ms | 3.058 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           2 | 113.68 ms | 0.915 ms | 0.856 ms |  1.64 
|    0.05 |      4 MB |
+/// | UpsertAllBatched |           2 | 162.47 ms | 3.169 ms | 5.118 ms |  2.42 
|    0.14 |      4 MB |
+/// |                  |             |           |          |          |       
|         |           |
+/// |     DataStreamer |           4 |  32.64 ms | 0.507 ms | 0.475 ms |  1.00 
|    0.00 |      4 MB |
+/// |        UpsertAll |           4 | 113.84 ms | 1.276 ms | 1.193 ms |  3.49 
|    0.05 |      4 MB |
+/// | UpsertAllBatched |           4 | 159.17 ms | 3.148 ms | 5.172 ms |  4.79 
|    0.17 |      4 MB |.
+/// </summary>
+[MemoryDiagnoser]
+public class DataStreamerBenchmark
+{
+    private IList<FakeServer> _servers = null!;
+    private IIgniteClient _client = null!;
+    private ITable _table = null!;
+    private IReadOnlyList<IIgniteTuple> _data = null!;
+
+    [Params(1, 2, 4)]
+    [SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global", 
Justification = "Benchmark parameter")]
+    public int ServerCount { get; set; }
+
+    [GlobalSetup]
+    public async Task GlobalSetup()
+    {
+        _servers = Enumerable.Range(0, ServerCount)
+            .Select(x => new FakeServer(disableOpsTracking: true, nodeName: 
"server-" + x)
+            {
+                MultiRowOperationDelayPerRow = 
TimeSpan.FromMilliseconds(0.001) // 1 ms per 1000 rows
+            })
+            .ToList();
+
+        // 17 partitions per node.
+        var partitionAssignment = Enumerable.Range(0, 17).SelectMany(_ => 
_servers.Select(x => x.Node.Id)).ToArray();
+
+        var cfg = new IgniteClientConfiguration();
+        foreach (var server in _servers)
+        {
+            cfg.Endpoints.Add(server.Endpoint);
+            server.PartitionAssignment = partitionAssignment;
+        }
+
+        _client = await IgniteClient.StartAsync(cfg);
+        _table = (await 
_client.Tables.GetTableAsync(FakeServer.ExistingTableName))!;
+        _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple { 
["id"] = x, ["name"] = "name " + x }).ToList();
+    }
+
+    [GlobalCleanup]
+    public void GlobalCleanup()
+    {
+        _client.Dispose();
+
+        foreach (var server in _servers)
+        {
+            server.Dispose();
+        }
+    }
+
+    [Benchmark(Baseline = true)]
+    public async Task DataStreamer() => await 
_table.RecordBinaryView.StreamDataAsync(_data.ToAsyncEnumerable());
+
+    [Benchmark]
+    public async Task UpsertAll() => await 
_table.RecordBinaryView.UpsertAllAsync(null, _data);
+
+    [Benchmark]
+    public async Task UpsertAllBatched()
+    {
+        var batchSize = DataStreamerOptions.Default.BatchSize;
+        var batch = new List<IIgniteTuple>(batchSize);
+
+        foreach (var tuple in _data)
+        {
+            batch.Add(tuple);
+
+            if (batch.Count == batchSize)
+            {
+                await _table.RecordBinaryView.UpsertAllAsync(null, batch);
+                batch.Clear();
+            }
+        }
+
+        if (batch.Count > 0)
+        {
+            await _table.RecordBinaryView.UpsertAllAsync(null, batch);
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
index e9e49300b5..be02cf68ce 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
@@ -44,7 +44,7 @@ namespace Apache.Ignite.Benchmarks.Table.Serialization
             [nameof(Car.Seats)] = Object.Seats
         };
 
-        internal static readonly Schema Schema = new(1, 1, new[]
+        internal static readonly Schema Schema = new(1, 1, 1, new[]
         {
             new Column(nameof(Car.Id), ColumnType.Uuid, IsNullable: false, 
IsColocation: true, IsKey: true, SchemaIndex: 0, Scale: 0, Precision: 0),
             new Column(nameof(Car.BodyType), ColumnType.String, IsNullable: 
false, IsColocation: false, IsKey: false, SchemaIndex: 1, Scale: 0, Precision: 
0),
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index b621f433a1..9954a160ea 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -76,6 +76,12 @@ namespace Apache.Ignite.Tests
             // No-op.
         }
 
+        public FakeServer(bool disableOpsTracking, string nodeName = 
"fake-server")
+            : this(null, nodeName, disableOpsTracking: disableOpsTracking)
+        {
+            // No-op.
+        }
+
         internal FakeServer(
             Func<RequestContext, bool>? shouldDropConnection = null,
             string nodeName = "fake-server",
@@ -111,6 +117,8 @@ namespace Apache.Ignite.Tests
 
         public TimeSpan OperationDelay { get; set; }
 
+        public TimeSpan MultiRowOperationDelayPerRow { get; set; }
+
         public TimeSpan HeartbeatDelay { get; set; }
 
         public int Port => ((IPEndPoint)_listener.LocalEndPoint!).Port;
@@ -125,6 +133,10 @@ namespace Apache.Ignite.Tests
 
         public long? LastSqlTxId { get; set; }
 
+        public long UpsertAllRowCount { get; set; }
+
+        public long DroppedConnectionCount { get; set; }
+
         public bool DropNewConnections
         {
             get => _dropNewConnections;
@@ -503,6 +515,7 @@ namespace Apache.Ignite.Tests
 
                     if (_shouldDropConnection(new 
RequestContext(++requestCount, opCode, requestId)))
                     {
+                        DroppedConnectionCount++;
                         break;
                     }
 
@@ -584,6 +597,19 @@ namespace Apache.Ignite.Tests
                         case ClientOp.TupleUpsertAll:
                         case ClientOp.TupleDeleteAll:
                         case ClientOp.TupleDeleteAllExact:
+                            reader.Skip(3);
+                            var count = reader.ReadInt32();
+
+                            if (MultiRowOperationDelayPerRow > TimeSpan.Zero)
+                            {
+                                Thread.Sleep(MultiRowOperationDelayPerRow * 
count);
+                            }
+
+                            if (opCode == ClientOp.TupleUpsertAll)
+                            {
+                                UpsertAllRowCount += count;
+                            }
+
                             Send(handler, requestId, new byte[] { 1, 0 
}.AsMemory());
                             continue;
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
index 0efe83faf1..ea83cada88 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
@@ -31,7 +31,11 @@ namespace Apache.Ignite.Tests
         {
             var logger = new ListLogger();
 
-            var cfg = new IgniteClientConfiguration(GetConfig()) { Logger = 
logger };
+            var cfg = new IgniteClientConfiguration
+            {
+                Endpoints = { "127.0.0.1:" + ServerPort },
+                Logger = logger
+            };
             using var client = await IgniteClient.StartAsync(cfg);
 
             logger.Clear();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index cfd2f6c83a..b1d6809f2e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -133,7 +133,11 @@ namespace Apache.Ignite.Tests
 
         protected static IgniteClientConfiguration GetConfig() => new()
         {
-            Endpoints = { "127.0.0.1:" + ServerNode.Port },
+            Endpoints =
+            {
+                "127.0.0.1:" + ServerNode.Port,
+                "127.0.0.1:" + (ServerNode.Port + 1)
+            },
             Logger = new ConsoleLogger { MinLevel = LogLevel.Trace }
         };
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
index 18ed80bb6d..e3d49bdaa9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
@@ -58,8 +58,8 @@ public class LoggingTests
         StringAssert.Contains("Handshake succeeded", log);
         StringAssert.Contains("Trying to establish secondary connections - 
awaiting 2 tasks", log);
         StringAssert.Contains("All secondary connections established", log);
-        StringAssert.Contains("Sending request [opCode=3", log);
-        StringAssert.Contains("Sending request [opCode=50", log);
+        StringAssert.Contains("Sending request [op=TablesGet", log);
+        StringAssert.Contains("Sending request [op=SqlExec", log);
         StringAssert.Contains("Connection closed", log);
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index fbfd8681d6..173f70b665 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Tests;
 
 using System;
+using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
 using Ignite.Table;
@@ -97,31 +98,14 @@ public class PartitionAwarenessTests
     }
 
     [Test]
-    public async Task TestClientReceivesPartitionAssignmentUpdates()
-    {
-        using var client = await GetClient();
-        var recordView = (await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
-
-        // Check default assignment.
-        await recordView.UpsertAsync(null, 1);
-        await AssertOpOnNode(() => recordView.UpsertAsync(null, 1), 
ClientOp.TupleUpsert, _server2);
-
-        // Update assignment.
-        foreach (var server in new[] { _server1, _server2 })
-        {
-            server.ClearOps();
-            server.PartitionAssignment = 
server.PartitionAssignment.Reverse().ToArray();
-            server.PartitionAssignmentChanged = true;
-        }
-
-        // First request on default node receives update flag.
-        // Make two requests because balancing uses round-robin node.
-        await client.Tables.GetTablesAsync();
-        await client.Tables.GetTablesAsync();
+    public async Task TestClientReceivesPartitionAssignmentUpdates() =>
+        await TestClientReceivesPartitionAssignmentUpdates(view => 
view.UpsertAsync(null, 1), ClientOp.TupleUpsert);
 
-        // Second request loads and uses new assignment.
-        await AssertOpOnNode(() => recordView.UpsertAsync(null, 1), 
ClientOp.TupleUpsert, _server1, allowExtraOps: true);
-    }
+    [Test]
+    public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
+        await TestClientReceivesPartitionAssignmentUpdates(
+            view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
+            ClientOp.TupleUpsertAll);
 
     [Test]
     [TestCaseSource(nameof(KeyNodeCases))]
@@ -147,6 +131,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), 
ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), 
ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), 
ClientOp.TupleDeleteExact, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new 
IgniteTuple { ["ID"] = keyId + 1 } };
@@ -180,6 +165,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key), 
ClientOp.TupleReplaceExact, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteAsync(null, key), 
ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key), 
ClientOp.TupleDeleteExact, expectedNode);
+        await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key 
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, key - 1, key + 1 };
@@ -225,6 +211,7 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs), 
ClientOp.TupleUpsertAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys), 
ClientOp.TupleDeleteAll, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs), 
ClientOp.TupleDeleteAllExact, expectedNode);
+        await AssertOpOnNode(() => 
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, 
expectedNode);
     }
 
     [Test]
@@ -252,6 +239,10 @@ public class PartitionAwarenessTests
         await AssertOpOnNode(() => kvView.RemoveAsync(null, key), 
ClientOp.TupleDelete, expectedNode);
         await AssertOpOnNode(() => kvView.RemoveAsync(null, key, val), 
ClientOp.TupleDeleteExact, expectedNode);
         await AssertOpOnNode(() => kvView.ContainsAsync(null, key), 
ClientOp.TupleContainsKey, expectedNode);
+        await AssertOpOnNode(
+            () => kvView.StreamDataAsync(new[] { new KeyValuePair<int, 
int>(key, val) }.ToAsyncEnumerable()),
+            ClientOp.TupleUpsertAll,
+            expectedNode);
 
         // Multi-key operations use the first key for colocation.
         var keys = new[] { key, key - 1, key + 1 };
@@ -358,6 +349,32 @@ public class PartitionAwarenessTests
         }
     }
 
+    private async Task 
TestClientReceivesPartitionAssignmentUpdates(Func<IRecordView<int>, Task> func, 
ClientOp op)
+    {
+        using var client = await GetClient();
+        var recordView = (await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
+
+        // Check default assignment.
+        await recordView.UpsertAsync(null, 1);
+        await AssertOpOnNode(() => func(recordView), op, _server2);
+
+        // Update assignment.
+        foreach (var server in new[] { _server1, _server2 })
+        {
+            server.ClearOps();
+            server.PartitionAssignment = 
server.PartitionAssignment.Reverse().ToArray();
+            server.PartitionAssignmentChanged = true;
+        }
+
+        // First request on default node receives update flag.
+        // Make two requests because balancing uses round-robin node.
+        await client.Tables.GetTablesAsync();
+        await client.Tables.GetTablesAsync();
+
+        // Second request loads and uses new assignment.
+        await AssertOpOnNode(() => func(recordView), op, _server1, 
allowExtraOps: true);
+    }
+
     private async Task<IIgniteClient> GetClient()
     {
         var cfg = new IgniteClientConfiguration
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index aeb8e9f27b..4b97a4cf1d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -214,7 +214,7 @@ public class ColocationHashTests : IgniteTestsBase
     {
         var columns = arr.Select((obj, ci) => GetColumn(obj, ci, 
timePrecision, timestampPrecision)).ToArray();
 
-        return new Schema(Version: 0, arr.Count, columns);
+        return new Schema(Version: 0, 0, arr.Count, columns);
     }
 
     private static Column GetColumn(object value, int schemaIndex, int 
timePrecision, int timestampPrecision)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
index 2a17f930a3..18818cc9a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
@@ -173,7 +173,7 @@ public class SslTests : IgniteTestsBase
         cfg.SslStreamFactory = new NullSslStreamFactory();
 
         using var client = await IgniteClient.StartAsync(cfg);
-        Assert.IsNull(client.GetConnections().Single().SslInfo);
+        Assert.IsNull(client.GetConnections().First().SslInfo);
     }
 
     [Test]
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
new file mode 100644
index 0000000000..daf3b3f693
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Ignite.Table;
+using Internal.Proto;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="IDataStreamerTarget{T}.StreamDataAsync"/>.
+/// <para />
+/// See DataStreamer partition awareness tests in <see 
cref="PartitionAwarenessTests"/>.
+/// </summary>
+public class DataStreamerTests : IgniteTestsBase
+{
+    private const int Count = 100;
+
+    [SetUp]
+    public async Task SetUp() =>
+        await TupleView.DeleteAllAsync(null, Enumerable.Range(0, 
Count).Select(x => GetTuple(x)));
+
+    [Test]
+    public async Task TestBasicStreamingRecordBinaryView()
+    {
+        var options = DataStreamerOptions.Default with { BatchSize = 10 };
+        var data = Enumerable.Range(0, Count).Select(x => GetTuple(x, "t" + 
x)).ToList();
+
+        await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+        await CheckData();
+    }
+
+    [Test]
+    public async Task TestBasicStreamingRecordView()
+    {
+        var options = DataStreamerOptions.Default with { BatchSize = 5 };
+        var data = Enumerable.Range(0, Count).Select(x => GetPoco(x, "t" + 
x)).ToList();
+
+        await 
Table.GetRecordView<Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
+        await CheckData();
+    }
+
+    [Test]
+    public async Task TestBasicStreamingKeyValueBinaryView()
+    {
+        var options = DataStreamerOptions.Default with { BatchSize = 10_000 };
+        var data = Enumerable.Range(0, Count)
+            .Select(x => new KeyValuePair<IIgniteTuple, 
IIgniteTuple>(GetTuple(x), GetTuple(x, "t" + x)))
+            .ToList();
+
+        await 
Table.KeyValueBinaryView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+        await CheckData();
+    }
+
+    [Test]
+    public async Task TestBasicStreamingKeyValueView()
+    {
+        var options = DataStreamerOptions.Default with { BatchSize = 1 };
+        var data = Enumerable.Range(0, Count)
+            .Select(x => new KeyValuePair<long, Poco>(x, GetPoco(x, "t" + x)))
+            .ToList();
+
+        await Table.GetKeyValueView<long, 
Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
+        await CheckData();
+    }
+
+    [Test]
+    public async Task TestAutoFlushFrequency([Values(true, false)] bool 
enabled)
+    {
+        using var cts = new CancellationTokenSource();
+
+        _ = TupleView.StreamDataAsync(
+            GetTuplesWithDelay(cts.Token),
+            new()
+            {
+                AutoFlushFrequency = enabled
+                    ? TimeSpan.FromMilliseconds(50)
+                    : TimeSpan.MaxValue
+            });
+
+        await Task.Delay(100);
+
+        Assert.AreEqual(enabled, await TupleView.ContainsKeyAsync(null, 
GetTuple(0)));
+        Assert.IsFalse(await TupleView.ContainsKeyAsync(null, GetTuple(1)));
+
+        cts.Cancel();
+    }
+
+    [Test]
+    public async Task TestCancellation()
+    {
+        using var cts = new CancellationTokenSource();
+        var streamTask = TupleView.StreamDataAsync(GetTuplesWithDelay(), 
cancellationToken: cts.Token);
+
+        cts.Cancel();
+        Assert.ThrowsAsync<TaskCanceledException>(async () => await 
streamTask);
+
+        Assert.IsFalse(
+            await TupleView.ContainsKeyAsync(null, GetTuple(0)),
+            "No data was streamed - cancelled before any batches were full.");
+    }
+
+    [Test]
+    public void TestOptionsValidation()
+    {
+        AssertException(DataStreamerOptions.Default with { BatchSize = -10 }, 
"BatchSize should be positive.");
+        AssertException(DataStreamerOptions.Default with { RetryLimit = -1 }, 
"RetryLimit should be non-negative.");
+        AssertException(
+            DataStreamerOptions.Default with { AutoFlushFrequency = 
TimeSpan.FromDays(-1) },
+            "AutoFlushFrequency should be positive.");
+
+        void AssertException(DataStreamerOptions options, string message)
+        {
+            var ex = Assert.ThrowsAsync<ArgumentException>(
+                async () => await 
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
 options));
+
+            StringAssert.Contains(message, ex?.Message);
+        }
+    }
+
+    [Test]
+    public async Task TestRetryLimitExhausted()
+    {
+        using var server = new FakeServer(
+            shouldDropConnection: ctx => ctx is { OpCode: 
ClientOp.TupleUpsertAll, RequestCount: > 7 });
+
+        using var client = await server.ConnectClientAsync();
+        var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+        var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
+            async () => await 
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
+
+        StringAssert.StartsWith("Operation TupleUpsertAll failed after 16 
retries", ex!.Message);
+    }
+
+    [Test]
+    public async Task TestManyItemsWithDisconnectAndRetry()
+    {
+        const int count = 100_000;
+        int upsertIdx = 0;
+
+        using var server = new FakeServer(
+            shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll 
&& Interlocked.Increment(ref upsertIdx) % 2 == 1);
+
+        // Streamer has it's own retry policy, so we can disable retries on 
the client.
+        using var client = await server.ConnectClientAsync(new 
IgniteClientConfiguration
+        {
+            RetryPolicy = new RetryNonePolicy()
+        });
+
+        var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        await 
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
+
+        Assert.AreEqual(count, server.UpsertAllRowCount);
+        Assert.AreEqual(count / DataStreamerOptions.Default.BatchSize, 
server.DroppedConnectionCount);
+    }
+
+    private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int 
count)
+    {
+        for (var i = 0; i < count; i++)
+        {
+            yield return new IgniteTuple { ["ID"] = i };
+            await Task.Yield();
+        }
+    }
+
+    private static async IAsyncEnumerable<IIgniteTuple> 
GetTuplesWithDelay([EnumeratorCancellation] CancellationToken ct = default)
+    {
+        for (var i = 0; i < 3; i++)
+        {
+            yield return GetTuple(i, "t" + i);
+            await Task.Delay(15000, ct);
+        }
+    }
+
+    private async Task CheckData()
+    {
+        var data = Enumerable.Range(0, Count).Select(x => GetTuple(x));
+        var res = await TupleView.GetAllAsync(null, data);
+
+        Assert.AreEqual(Count, res.Count);
+
+        foreach (var (_, hasVal) in res)
+        {
+            Assert.IsTrue(hasVal);
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
index f1d60d0479..4ee9a143fb 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
@@ -32,7 +32,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
     // ReSharper disable NotAccessedPositionalProperty.Local
     public class ObjectSerializerHandlerTests
     {
-        private static readonly Schema Schema = new(1, 1, new[]
+        private static readonly Schema Schema = new(1, 1, 1, new[]
         {
             new Column("Key", ColumnType.Int64, IsNullable: false, 
IsColocation: true, IsKey: true, SchemaIndex: 0, Scale: 0, Precision: 0),
             new Column("Val", ColumnType.String, IsNullable: false, 
IsColocation: false, IsKey: false, SchemaIndex: 1, Scale: 0, Precision: 0)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index ac85e20f77..fb499064bf 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -266,9 +266,12 @@ namespace Apache.Ignite.Tests.Transactions
         [Test]
         public async Task TestToString()
         {
-            await using var tx1 = await Client.Transactions.BeginAsync();
-            await using var tx2 = await 
Client.Transactions.BeginAsync(new(ReadOnly: true));
-            await using var tx3 = await Client.Transactions.BeginAsync();
+            // Single connection.
+            using var client = await IgniteClient.StartAsync(new() { Endpoints 
= { "127.0.0.1:" + ServerPort } });
+
+            await using var tx1 = await client.Transactions.BeginAsync();
+            await using var tx2 = await 
client.Transactions.BeginAsync(new(ReadOnly: true));
+            await using var tx3 = await client.Transactions.BeginAsync();
 
             await tx2.RollbackAsync();
             await tx3.CommitAsync();
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 32ff848e54..1fd36d60d7 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -207,10 +207,14 @@ namespace Apache.Ignite.Internal.Buffers
         /// </summary>
         /// <param name="val">Value.</param>
         /// <param name="pos">Position.</param>
-        public void WriteInt(int val, int pos)
-        {
-            BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(pos + 
_prefixSize), val);
-        }
+        public void WriteInt(int val, int pos) => 
BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize), val);
+
+        /// <summary>
+        /// Writes an int at specified position.
+        /// </summary>
+        /// <param name="val">Value.</param>
+        /// <param name="pos">Position.</param>
+        public void WriteIntBigEndian(int val, int pos) => 
BinaryPrimitives.WriteInt32BigEndian(_buffer.AsSpan(pos + _prefixSize), val);
 
         /// <summary>
         /// Writes a long at current position.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 33e5cc3d75..8140c0da54 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -157,12 +157,14 @@ namespace Apache.Ignite.Internal
         /// <param name="tx">Transaction.</param>
         /// <param name="request">Request data.</param>
         /// <param name="preferredNode">Preferred node.</param>
+        /// <param name="retryPolicyOverride">Retry policy.</param>
         /// <returns>Response data and socket.</returns>
         public async Task<(PooledBuffer Buffer, ClientSocket Socket)> 
DoOutInOpAndGetSocketAsync(
             ClientOp clientOp,
             Transaction? tx = null,
             PooledArrayBuffer? request = null,
-            PreferredNode preferredNode = default)
+            PreferredNode preferredNode = default,
+            IRetryPolicy? retryPolicyOverride = null)
         {
             if (tx != null)
             {
@@ -194,7 +196,7 @@ namespace Apache.Ignite.Internal
                     // Preferred node connection may not be available, do not 
use it after first failure.
                     preferredNode = default;
 
-                    if (!HandleOpError(e, clientOp, ref attempt, ref errors))
+                    if (!HandleOpError(e, clientOp, ref attempt, ref errors, 
retryPolicyOverride ?? Configuration.RetryPolicy))
                     {
                         throw;
                     }
@@ -514,10 +516,11 @@ namespace Apache.Ignite.Internal
         /// <param name="exception">Exception that caused the operation to 
fail.</param>
         /// <param name="op">Operation code.</param>
         /// <param name="attempt">Current attempt.</param>
+        /// <param name="retryPolicy">Retry policy.</param>
         /// <returns>
         /// <c>true</c> if the operation should be retried on another 
connection, <c>false</c> otherwise.
         /// </returns>
-        private bool ShouldRetry(Exception exception, ClientOp op, int attempt)
+        private bool ShouldRetry(Exception exception, ClientOp op, int 
attempt, IRetryPolicy? retryPolicy)
         {
             var e = exception;
 
@@ -532,7 +535,7 @@ namespace Apache.Ignite.Internal
                 return false;
             }
 
-            if (Configuration.RetryPolicy is null or RetryNonePolicy)
+            if (retryPolicy is null or RetryNonePolicy)
             {
                 return false;
             }
@@ -547,7 +550,7 @@ namespace Apache.Ignite.Internal
 
             var ctx = new RetryPolicyContext(new(Configuration), 
publicOpType.Value, attempt, exception);
 
-            return Configuration.RetryPolicy.ShouldRetry(ctx);
+            return retryPolicy.ShouldRetry(ctx);
         }
 
         /// <summary>
@@ -557,15 +560,17 @@ namespace Apache.Ignite.Internal
         /// <param name="op">Operation code.</param>
         /// <param name="attempt">Current attempt.</param>
         /// <param name="errors">Previous errors.</param>
+        /// <param name="retryPolicy">Retry policy.</param>
         /// <returns>True if the error was handled, false otherwise.</returns>
         [SuppressMessage("Microsoft.Design", "CA1002:DoNotExposeGenericLists", 
Justification = "Private.")]
         private bool HandleOpError(
             Exception exception,
             ClientOp op,
             ref int attempt,
-            ref List<Exception>? errors)
+            ref List<Exception>? errors,
+            IRetryPolicy? retryPolicy)
         {
-            if (!ShouldRetry(exception, op, attempt))
+            if (!ShouldRetry(exception, op, attempt, retryPolicy))
             {
                 if (_logger?.IsEnabled(LogLevel.Debug) == true)
                 {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 749e250575..42dd3f8659 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -582,8 +582,7 @@ namespace Apache.Ignite.Internal
 
             if (_logger?.IsEnabled(LogLevel.Trace) == true)
             {
-                _logger.Trace(
-                    $"Sending request [opCode={(int)op}, 
remoteAddress={ConnectionContext.ClusterNode.Address}, requestId={requestId}]");
+                _logger.Trace($"Sending request [op={op}, 
remoteAddress={ConnectionContext.ClusterNode.Address}, requestId={requestId}]");
             }
 
             await 
_sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
index 29dc6d64b9..a97c4eae67 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
@@ -33,9 +33,13 @@ namespace Apache.Ignite.Internal.Common
         /// <param name="arg">The argument.</param>
         /// <param name="argName">Name of the argument.</param>
         /// <typeparam name="T">Arg type.</typeparam>
-        /// <returns>Argument.</returns>
-        public static T NotNull<T>([NoEnumeration] T arg, 
[CallerArgumentExpression("arg")] string? argName = null) =>
-            arg == null ? throw new ArgumentNullException(argName) : arg;
+        public static void NotNull<T>([NoEnumeration] T arg, 
[CallerArgumentExpression("arg")] string? argName = null)
+        {
+            if (arg == null)
+            {
+                throw new ArgumentNullException(argName);
+            }
+        }
 
         /// <summary>
         /// Throws an ArgumentException if specified arg is null or empty 
string.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 85535194af..15314d5e0e 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
     using System.Collections;
     using System.Diagnostics;
     using System.Numerics;
-    using System.Runtime.InteropServices;
     using Buffers;
     using Ignite.Sql;
     using NodaTime;
@@ -482,8 +481,10 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
 
                 if (ShouldHash())
                 {
-                    _hash = 
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[..8]), _hash);
-                    _hash = 
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[8..]), _hash);
+                    var lo = BinaryPrimitives.ReadInt64LittleEndian(span[..8]);
+                    var hi = BinaryPrimitives.ReadInt64LittleEndian(span[8..]);
+
+                    _hash = HashUtils.Hash32(hi, HashUtils.Hash32(lo, _hash));
                 }
             }
             else
@@ -571,24 +572,8 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
         /// </summary>
         /// <param name="value">Value.</param>
         /// <param name="scale">Decimal scale from schema.</param>
-        public void AppendDecimal(decimal value, int scale)
-        {
-            if (value != decimal.Zero)
-            {
-                var (unscaledValue, valueScale) = DeconstructDecimal(value);
-
-                PutDecimal(scale, unscaledValue, valueScale);
-            }
-            else
-            {
-                if (ShouldHash())
-                {
-                    _hash = HashUtils.Hash32(stackalloc byte[1] { 0 }, _hash);
-                }
-            }
-
-            OnWrite();
-        }
+        public void AppendDecimal(decimal value, int scale) =>
+            AppendNumber(BinaryTupleCommon.DecimalToUnscaledBigInteger(value, 
scale));
 
         /// <summary>
         /// Appends a decimal.
@@ -1145,20 +1130,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
             _buffer.Dispose();
         }
 
-        private static (BigInteger Unscaled, int Scale) 
DeconstructDecimal(decimal value)
-        {
-            Span<int> bits = stackalloc int[4];
-            decimal.GetBits(value, bits);
-
-            var scale = (bits[3] & 0x00FF0000) >> 16;
-            var sign = bits[3] >> 31;
-
-            var bytes = MemoryMarshal.Cast<int, byte>(bits[..3]);
-            var unscaled = new BigInteger(bytes, true);
-
-            return (sign < 0 ? -unscaled : unscaled, scale);
-        }
-
         private static int GetDecimalScale(decimal value)
         {
             Span<int> bits = stackalloc int[4];
@@ -1167,30 +1138,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
             return (bits[3] & 0x00FF0000) >> 16;
         }
 
-        private void PutDecimal(int scale, BigInteger unscaledValue, int 
valueScale)
-        {
-            if (scale > valueScale)
-            {
-                unscaledValue *= BigInteger.Pow(new BigInteger(10), scale - 
valueScale);
-            }
-            else if (scale < valueScale)
-            {
-                unscaledValue /= BigInteger.Pow(new BigInteger(10), valueScale 
- scale);
-            }
-
-            var size = unscaledValue.GetByteCount();
-            var destination = GetSpan(size);
-            var success = unscaledValue.TryWriteBytes(destination, out int 
written, isBigEndian: true);
-
-            if (ShouldHash())
-            {
-                _hash = HashUtils.Hash32(destination[..written], _hash);
-            }
-
-            Debug.Assert(success, "success");
-            Debug.Assert(written == size, "written == size");
-        }
-
         private void PutByte(sbyte value) => 
_buffer.WriteByte(unchecked((byte)value));
 
         private void PutShort(short value) => _buffer.WriteShort(value);
@@ -1232,13 +1179,7 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
 
         private (long Seconds, int Nanos) PutTimestamp(Instant value, int 
precision)
         {
-            // Logic taken from
-            // 
https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L69
-            // (Apache License).
-            // See discussion: 
https://github.com/nodatime/nodatime/issues/1644#issuecomment-1260524451
-            long seconds = value.ToUnixTimeSeconds();
-            Duration remainder = value - Instant.FromUnixTimeSeconds(seconds);
-            int nanos = 
TemporalTypes.NormalizeNanos((int)remainder.NanosecondOfDay, precision);
+            var (seconds, nanos) = value.ToSecondsAndNanos(precision);
 
             PutLong(seconds);
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
index 23b45a4a11..ff094582a9 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
@@ -17,7 +17,10 @@
 
 namespace Apache.Ignite.Internal.Proto.BinaryTuple
 {
+    using System;
     using System.Diagnostics;
+    using System.Numerics;
+    using System.Runtime.InteropServices;
     using NodaTime;
 
     /// <summary>
@@ -111,5 +114,44 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
         {
             return (byte)(1 << (index % 8));
         }
+
+        /// <summary>
+        /// Converts decimal to unscaled BigInteger.
+        /// </summary>
+        /// <param name="value">Decimal value.</param>
+        /// <param name="scale">Column scale.</param>
+        /// <returns>Unscaled BigInteger according to column scale.</returns>
+        public static BigInteger DecimalToUnscaledBigInteger(decimal value, 
int scale)
+        {
+            if (value == decimal.Zero)
+            {
+                return BigInteger.Zero;
+            }
+
+            Span<int> bits = stackalloc int[4];
+            decimal.GetBits(value, bits);
+
+            var valueScale = (bits[3] & 0x00FF0000) >> 16;
+            var sign = bits[3] >> 31;
+
+            var bytes = MemoryMarshal.Cast<int, byte>(bits[..3]);
+            var unscaled = new BigInteger(bytes, true);
+
+            if (sign < 0)
+            {
+                unscaled = -unscaled;
+            }
+
+            if (scale > valueScale)
+            {
+                unscaled *= BigInteger.Pow(new BigInteger(10), scale - 
valueScale);
+            }
+            else if (scale < valueScale)
+            {
+                unscaled /= BigInteger.Pow(new BigInteger(10), valueScale - 
scale);
+            }
+
+            return unscaled;
+        }
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
new file mode 100644
index 0000000000..9497885965
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Buffers;
+using Common;
+using Ignite.Table;
+using Proto;
+using Proto.BinaryTuple;
+using Proto.MsgPack;
+using Serialization;
+
+/// <summary>
+/// Data streamer.
+/// <para />
+/// Implementation notes:
+/// * Hashing is combined with serialization (unlike Java client), so we write 
binary tuples to a per-node buffer right away.
+///   - Pros: cheaper happy path;
+///   - Cons: will require re-serialization on schema update.
+/// * Iteration and serialization are sequential.
+/// * Batches are sent asynchronously; we wait for the previous batch only 
when the next batch for the given node is full.
+/// * The more connections to different servers we have - the more parallelism 
we get (see benchmark).
+/// * There is no parallelism for the same node, because we need to guarantee 
ordering.
+/// </summary>
+internal static class DataStreamer
+{
+    private static readonly TimeSpan PartitionAssignmentUpdateFrequency = 
TimeSpan.FromSeconds(15);
+
+    /// <summary>
+    /// Streams the data.
+    /// </summary>
+    /// <param name="data">Data.</param>
+    /// <param name="sender">Batch sender.</param>
+    /// <param name="writer">Item writer.</param>
+    /// <param name="schemaProvider">Schema provider.</param>
+    /// <param name="partitionAssignmentProvider">Partitioner.</param>
+    /// <param name="options">Options.</param>
+    /// <param name="cancellationToken">Cancellation token.</param>
+    /// <typeparam name="T">Element type.</typeparam>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    internal static async Task StreamDataAsync<T>(
+        IAsyncEnumerable<T> data,
+        Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
+        IRecordSerializerHandler<T> writer,
+        Func<Task<Schema>> schemaProvider,
+        Func<ValueTask<string[]?>> partitionAssignmentProvider,
+        DataStreamerOptions options,
+        CancellationToken cancellationToken)
+    {
+        IgniteArgumentCheck.NotNull(data);
+
+        IgniteArgumentCheck.Ensure(options.BatchSize > 0, 
$"{nameof(options.BatchSize)} should be positive.");
+        IgniteArgumentCheck.Ensure(options.AutoFlushFrequency > TimeSpan.Zero, 
$"{nameof(options.AutoFlushFrequency)} should be positive.");
+        IgniteArgumentCheck.Ensure(options.RetryLimit >= 0, 
$"{nameof(options.RetryLimit)} should be non-negative.");
+
+        // ConcurrentDictionary is not necessary because we consume the source 
sequentially.
+        // However, locking for batches is required due to auto-flush 
background task.
+        var batches = new Dictionary<string, Batch>();
+        var retryPolicy = new RetryLimitPolicy { RetryLimit = 
options.RetryLimit };
+
+        // TODO: IGNITE-19710 Data Streamer schema synchronization
+        var schema = await schemaProvider().ConfigureAwait(false);
+        var partitionAssignment = await 
partitionAssignmentProvider().ConfigureAwait(false);
+        var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+        using var flushCts = new CancellationTokenSource();
+
+        try
+        {
+            _ = AutoFlushAsync(flushCts.Token);
+
+            await foreach (var item in 
data.WithCancellation(cancellationToken))
+            {
+                var (batch, partition) = Add(item);
+
+                if (batch.Count >= options.BatchSize)
+                {
+                    await SendAsync(batch, partition).ConfigureAwait(false);
+                }
+
+                if (lastPartitionsAssignmentCheck.Elapsed > 
PartitionAssignmentUpdateFrequency)
+                {
+                    var newAssignment = await 
partitionAssignmentProvider().ConfigureAwait(false);
+
+                    if (newAssignment != partitionAssignment)
+                    {
+                        // Drain all batches to preserve order when partition 
assignment changes.
+                        await Drain().ConfigureAwait(false);
+                        partitionAssignment = newAssignment;
+                    }
+
+                    lastPartitionsAssignmentCheck.Restart();
+                }
+            }
+
+            await Drain().ConfigureAwait(false);
+        }
+        finally
+        {
+            flushCts.Cancel();
+            foreach (var batch in batches.Values)
+            {
+                batch.Buffer.Dispose();
+            }
+        }
+
+        (Batch Batch, string Partition) Add(T item)
+        {
+            var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count, 
hashedColumnsPredicate: schema);
+
+            try
+            {
+                return Add0(item, ref tupleBuilder);
+            }
+            finally
+            {
+                tupleBuilder.Dispose();
+            }
+        }
+
+        (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder)
+        {
+            var columnCount = schema.Columns.Count;
+
+            // Use MemoryMarshal to work around [CS8352]: "Cannot use variable 
'noValueSet' in this context
+            // because it may expose referenced variables outside of their 
declaration scope".
+            Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
+            Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref 
MemoryMarshal.GetReference(noValueSet), columnCount);
+
+            writer.Write(ref tupleBuilder, item, schema, columnCount, 
noValueSetRef);
+
+            var partition = partitionAssignment == null
+                ? string.Empty // Default connection.
+                : partitionAssignment[Math.Abs(tupleBuilder.Hash % 
partitionAssignment.Length)];
+
+            var batch = GetOrCreateBatch(partition);
+
+            lock (batch)
+            {
+                batch.Count++;
+
+                
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
+                batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+            }
+
+            return (batch, partition);
+        }
+
+        Batch GetOrCreateBatch(string partition)
+        {
+            ref var batchRef = ref 
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+
+            if (batchRef == null)
+            {
+                batchRef = new Batch();
+                InitBuffer(batchRef);
+            }
+
+            return batchRef;
+        }
+
+        async Task SendAsync(Batch batch, string partition)
+        {
+            var expectedSize = batch.Count;
+
+            // Wait for the previous task for this batch to finish: preserve 
order, backpressure control.
+            await batch.Task.ConfigureAwait(false);
+
+            lock (batch)
+            {
+                if (batch.Count != expectedSize || batch.Count == 0)
+                {
+                    // Concurrent update happened.
+                    return;
+                }
+
+                var buf = batch.Buffer;
+
+                // See RecordSerializer.WriteMultiple.
+                buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
+                buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
+
+                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task);
+
+                batch.Count = 0;
+                batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
+                InitBuffer(batch);
+                batch.LastFlush = Stopwatch.GetTimestamp();
+            }
+        }
+
+        async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string 
partition, Task oldTask)
+        {
+            try
+            {
+                // Wait for the previous batch for this node to preserve item 
order.
+                await oldTask.ConfigureAwait(false);
+                await sender(buf, partition, 
retryPolicy).ConfigureAwait(false);
+            }
+            finally
+            {
+                buf.Dispose();
+            }
+        }
+
+        async Task AutoFlushAsync(CancellationToken flushCt)
+        {
+            while (!flushCt.IsCancellationRequested)
+            {
+                await Task.Delay(options.AutoFlushFrequency, 
flushCt).ConfigureAwait(false);
+                var ts = Stopwatch.GetTimestamp();
+
+                foreach (var (partition, batch) in batches)
+                {
+                    if (batch.Count > 0 && ts - batch.LastFlush > 
options.AutoFlushFrequency.Ticks)
+                    {
+                        await SendAsync(batch, 
partition).ConfigureAwait(false);
+                    }
+                }
+            }
+        }
+
+        void InitBuffer(Batch batch)
+        {
+            var buf = batch.Buffer;
+
+            var w = buf.MessageWriter;
+            w.Write(schema.TableId);
+            w.WriteTx(null);
+            w.Write(schema.Version);
+
+            batch.CountPos = buf.Position;
+            buf.Advance(5); // Reserve count.
+        }
+
+        async Task Drain()
+        {
+            foreach (var (partition, batch) in batches)
+            {
+                if (batch.Count > 0)
+                {
+                    await SendAsync(batch, partition).ConfigureAwait(false);
+                }
+
+                await batch.Task.ConfigureAwait(false);
+            }
+        }
+    }
+
+    private sealed record Batch
+    {
+        public PooledArrayBuffer Buffer { get; set; } = 
ProtoCommon.GetMessageWriter();
+
+        public int Count { get; set; }
+
+        public int CountPos { get; set; }
+
+        public Task Task { get; set; } = Task.CompletedTask; // Task for the 
previous buffer.
+
+        public long LastFlush { get; set; }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 70efdd5abb..c3174184e8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Table;
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using System.Threading.Tasks;
 using Apache.Ignite.Transactions;
 using Common;
@@ -156,6 +157,13 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
         return new IgniteQueryable<KeyValuePair<TK, TV>>(provider);
     }
 
+    /// <inheritdoc/>
+    public async Task StreamDataAsync(
+        IAsyncEnumerable<KeyValuePair<TK, TV>> data,
+        DataStreamerOptions? options = null,
+        CancellationToken cancellationToken = default) =>
+        await _recordView.StreamDataAsync(ToKv(data), options, 
cancellationToken).ConfigureAwait(false);
+
     /// <inheritdoc/>
     public override string ToString() =>
         new IgniteToStringBuilder(GetType())
@@ -184,4 +192,12 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
 
         return new(k, v);
     }
+
+    private static async IAsyncEnumerable<KvPair<TK, TV>> 
ToKv(IAsyncEnumerable<KeyValuePair<TK, TV>> pairs)
+    {
+        await foreach (var pair in pairs)
+        {
+            yield return ToKv(pair);
+        }
+    }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 6e4f7c1620..ee622d3933 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Table
     using System;
     using System.Collections.Generic;
     using System.Linq;
+    using System.Threading;
     using System.Threading.Tasks;
     using Buffers;
     using Common;
@@ -323,6 +324,29 @@ namespace Apache.Ignite.Internal.Table
         public async Task<IList<T>> DeleteAllExactAsync(ITransaction? 
transaction, IEnumerable<T> records) =>
             await DeleteAllAsync(transaction, records, exact: 
true).ConfigureAwait(false);
 
+        /// <inheritdoc/>
+        public async Task StreamDataAsync(
+            IAsyncEnumerable<T> data,
+            DataStreamerOptions? options = null,
+            CancellationToken cancellationToken = default) =>
+            await DataStreamer.StreamDataAsync(
+                data,
+                sender: async (batch, preferredNode, retryPolicy) =>
+                {
+                    using var resBuf = await DoOutInOpAsync(
+                            ClientOp.TupleUpsertAll,
+                            tx: null,
+                            batch,
+                            PreferredNode.FromId(preferredNode),
+                            retryPolicy)
+                        .ConfigureAwait(false);
+                },
+                writer: _ser.Handler,
+                schemaProvider: () => _table.GetLatestSchemaAsync(),
+                partitionAssignmentProvider: () => 
_table.GetPartitionAssignmentAsync(),
+                options ?? DataStreamerOptions.Default,
+                cancellationToken).ConfigureAwait(false);
+
         /// <inheritdoc/>
         public override string ToString() =>
             new IgniteToStringBuilder(GetType())
@@ -411,9 +435,11 @@ namespace Apache.Ignite.Internal.Table
             ClientOp clientOp,
             Transaction? tx,
             PooledArrayBuffer? request = null,
-            PreferredNode preferredNode = default)
+            PreferredNode preferredNode = default,
+            IRetryPolicy? retryPolicyOverride = null)
         {
-            var (buf, _) = await 
_table.Socket.DoOutInOpAndGetSocketAsync(clientOp, tx, request, 
preferredNode).ConfigureAwait(false);
+            var (buf, _) = await 
_table.Socket.DoOutInOpAndGetSocketAsync(clientOp, tx, request, preferredNode, 
retryPolicyOverride)
+                .ConfigureAwait(false);
 
             return buf;
         }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
index 65deb37bba..c4bf0fac06 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
@@ -24,10 +24,12 @@ namespace Apache.Ignite.Internal.Table
     /// Schema.
     /// </summary>
     /// <param name="Version">Version.</param>
+    /// <param name="TableId">Table id.</param>
     /// <param name="KeyColumnCount">Key column count.</param>
     /// <param name="Columns">Columns in schema order.</param>
     internal sealed record Schema(
         int Version,
+        int TableId,
         int KeyColumnCount,
         IReadOnlyList<Column> Columns) : IHashedColumnIndexProvider
     {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 35d0eaf926..37d92b2972 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -205,24 +205,11 @@ namespace Apache.Ignite.Internal.Table
             return PreferredNode.FromId(nodeId);
         }
 
-        private Task<Schema> GetCachedSchemaAsync(int version)
-        {
-            var task = GetOrAdd();
-
-            if (!task.IsFaulted)
-            {
-                return task;
-            }
-
-            // Do not return failed task. Remove it from the cache and try 
again.
-            _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version, 
task));
-
-            return GetOrAdd();
-
-            Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver, 
tbl) => tbl.LoadSchemaAsync(ver), this);
-        }
-
-        private async ValueTask<string[]?> GetPartitionAssignmentAsync()
+        /// <summary>
+        /// Gets the partition assignment.
+        /// </summary>
+        /// <returns>Partition assignment.</returns>
+        internal async ValueTask<string[]?> GetPartitionAssignmentAsync()
         {
             var socketVer = _socket.PartitionAssignmentVersion;
             var assignment = _partitionAssignment;
@@ -258,6 +245,23 @@ namespace Apache.Ignite.Internal.Table
             }
         }
 
+        private Task<Schema> GetCachedSchemaAsync(int version)
+        {
+            var task = GetOrAdd();
+
+            if (!task.IsFaulted)
+            {
+                return task;
+            }
+
+            // Do not return failed task. Remove it from the cache and try 
again.
+            _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version, 
task));
+
+            return GetOrAdd();
+
+            Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver, 
tbl) => tbl.LoadSchemaAsync(ver), this);
+        }
+
         /// <summary>
         /// Loads the schema.
         /// </summary>
@@ -349,7 +353,7 @@ namespace Apache.Ignite.Internal.Table
                 }
             }
 
-            var schema = new Schema(schemaVersion, keyColumnCount, columns);
+            var schema = new Schema(schemaVersion, Id, keyColumnCount, 
columns);
             _schemas[schemaVersion] = Task.FromResult(schema);
 
             if (_logger?.IsEnabled(LogLevel.Debug) == true)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
index db89c2362f..f827cd98e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal.Table;
 
 using System;
 using Ignite.Sql;
+using NodaTime;
 
 /// <summary>
 /// Temporal type utils.
@@ -51,4 +52,23 @@ internal static class TemporalTypes
             9 => nanos, // 1ns precision
             _ => throw new ArgumentException("Unsupported fractional seconds 
precision: " + precision)
         };
+
+    /// <summary>
+    /// Deconstructs Instant into seconds and nanoseconds.
+    /// </summary>
+    /// <param name="value">Value.</param>
+    /// <param name="precision">Column precision.</param>
+    /// <returns>Seconds and nanos.</returns>
+    public static (long Seconds, int Nanos) ToSecondsAndNanos(this Instant 
value, int precision)
+    {
+        // Logic taken from
+        // 
https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L69
+        // (Apache License).
+        // See discussion: 
https://github.com/nodatime/nodatime/issues/1644#issuecomment-1260524451
+        long seconds = value.ToUnixTimeSeconds();
+        Duration remainder = value - Instant.FromUnixTimeSeconds(seconds);
+        int nanos = NormalizeNanos((int)remainder.NanosecondOfDay, precision);
+
+        return (seconds, nanos);
+    }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs
new file mode 100644
index 0000000000..cc99ed364b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+
+/// <summary>
+/// Data streamer options.
+/// </summary>
+/// <param name="BatchSize">Batch size - the number of entries that will be 
sent to the cluster in one network call.</param>
+/// <param name="RetryLimit">Retry limit for a batch. If a batch fails to be 
sent to the cluster,
+/// the streamer will retry it a number of times.</param>
+/// <param name="AutoFlushFrequency">Auto flush frequency - the period of time 
after which the streamer
+/// will flush the per-node buffer even if it is not full.</param>
+public sealed record DataStreamerOptions(int BatchSize, int RetryLimit, 
TimeSpan AutoFlushFrequency)
+{
+    /// <summary>
+    /// Default streamer options.
+    /// </summary>
+    public static readonly DataStreamerOptions Default = new();
+
+    /// <summary>
+    /// Initializes a new instance of the <see cref="DataStreamerOptions"/> 
class.
+    /// </summary>
+    public DataStreamerOptions()
+        : this(
+            BatchSize: 1000,
+            RetryLimit: 16,
+            AutoFlushFrequency: TimeSpan.FromSeconds(5))
+    {
+        // No-op.
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
new file mode 100644
index 0000000000..899be69f20
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable TypeParameterCanBeVariant (justification: future 
compatibility for streamer with receiver).
+namespace Apache.Ignite.Table;
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+/// <summary>
+/// Represents an entity that can be used as a target for streaming data.
+/// </summary>
+/// <typeparam name="T">Data type.</typeparam>
+public interface IDataStreamerTarget<T>
+{
+    /// <summary>
+    /// Streams data into the underlying table.
+    /// </summary>
+    /// <param name="data">Data.</param>
+    /// <param name="options">Streamer options.</param>
+    /// <param name="cancellationToken">Cancellation token.</param>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task StreamDataAsync(IAsyncEnumerable<T> data, DataStreamerOptions? 
options = null, CancellationToken cancellationToken = default);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
index 5b60e08fae..4a8b94decf 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
@@ -28,7 +28,7 @@ using Transactions;
 /// </summary>
 /// <typeparam name="TK">Key type.</typeparam>
 /// <typeparam name="TV">Value type.</typeparam>
-public interface IKeyValueView<TK, TV>
+public interface IKeyValueView<TK, TV> : IDataStreamerTarget<KeyValuePair<TK, 
TV>>
     where TK : notnull
     where TV : notnull
 {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
index 83b17763b6..83ecf36ee8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
@@ -27,7 +27,7 @@ namespace Apache.Ignite.Table
     /// Record view interface provides methods to access table records.
     /// </summary>
     /// <typeparam name="T">Record type.</typeparam>
-    public interface IRecordView<T>
+    public interface IRecordView<T> : IDataStreamerTarget<T>
         where T : notnull
     {
         /// <summary>

Reply via email to