This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 79a841d952 IGNITE-19545 .NET: Add Basic Data Streamer (#2182)
79a841d952 is described below
commit 79a841d95247c338b80ab91a93ec18ee7c0344ac
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jun 13 15:02:01 2023 +0300
IGNITE-19545 .NET: Add Basic Data Streamer (#2182)
Add basic data streamer (without receiver) to .NET client (Record and KV
views).
This covers the first use case from
https://cwiki.apache.org/confluence/display/IGNITE/IEP-102%3A+Data+Streamer.
* No changes to client protocol: `UPSERT_ALL` is used to upsert batches.
* Hashing is combined with serialization (unlike Java client), so we write
binary tuples to a per-node buffer right away.
- Pros: cheaper happy path;
- Cons: will require re-serialization on schema update.
* Iteration and serialization are sequential.
* Batches are sent asynchronously; we wait for the previous batch only
when the next batch for the given node is full.
* The more connections to different servers we have - the more parallelism
we get (see benchmark).
* There is no parallelism for the same node, because we need to guarantee
ordering.
* Extra: some hashing refactoring (initially I wanted to separate hashing
from serialization, which turned out to be complicated and less performant).
**Benchmark: upsert 100K rows** (1 ms per 1000 rows delay on server)
`FakeServer` imitates work by doing `Thread.Sleep` based on the batch size,
so `UpsertAll` wins in single-server scenario because it inserts everything in
one batch (while streamer sends 100 batches). With multiple servers, streamer
scales linearly and wins - because it sends batches to different nodes in
parallel.
```
Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
| Method | ServerCount | Mean | Error | StdDev | Ratio
| RatioSD | Allocated |
|----------------- |------------
|----------:|---------:|---------:|------:|--------:|----------:|
| DataStreamer | 1 | 141.56 ms | 2.725 ms | 3.244 ms | 1.00
| 0.00 | 4 MB |
| UpsertAll | 1 | 112.99 ms | 1.203 ms | 1.125 ms | 0.80
| 0.02 | 4 MB |
| UpsertAllBatched | 1 | 159.11 ms | 3.175 ms | 4.451 ms | 1.12
| 0.04 | 4 MB |
| | | | | |
| | |
| DataStreamer | 2 | 67.29 ms | 1.331 ms | 3.058 ms | 1.00
| 0.00 | 4 MB |
| UpsertAll | 2 | 113.68 ms | 0.915 ms | 0.856 ms | 1.64
| 0.05 | 4 MB |
| UpsertAllBatched | 2 | 162.47 ms | 3.169 ms | 5.118 ms | 2.42
| 0.14 | 4 MB |
| | | | | |
| | |
| DataStreamer | 4 | 32.64 ms | 0.507 ms | 0.475 ms | 1.00
| 0.00 | 4 MB |
| UpsertAll | 4 | 113.84 ms | 1.276 ms | 1.193 ms | 3.49
| 0.05 | 4 MB |
| UpsertAllBatched | 4 | 159.17 ms | 3.148 ms | 5.172 ms | 4.79
| 0.17 | 4 MB |.
```
---
.../dotnet/Apache.Ignite.Benchmarks/Program.cs | 8 +-
.../Table/DataStreamerBenchmark.cs | 127 ++++++++++
.../SerializerHandlerBenchmarksBase.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 26 ++
.../dotnet/Apache.Ignite.Tests/HeartbeatTests.cs | 6 +-
.../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 6 +-
.../dotnet/Apache.Ignite.Tests/LoggingTests.cs | 4 +-
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 65 +++--
.../Proto/ColocationHashTests.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/SslTests.cs | 2 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 208 +++++++++++++++
.../Serialization/ObjectSerializerHandlerTests.cs | 2 +-
.../Transactions/TransactionsTests.cs | 9 +-
.../Internal/Buffers/PooledArrayBuffer.cs | 12 +-
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 19 +-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 3 +-
.../Internal/Common/IgniteArgumentCheck.cs | 10 +-
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 73 +-----
.../Proto/BinaryTuple/BinaryTupleCommon.cs | 42 +++
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 282 +++++++++++++++++++++
.../Apache.Ignite/Internal/Table/KeyValueView.cs | 16 ++
.../Apache.Ignite/Internal/Table/RecordView.cs | 30 ++-
.../dotnet/Apache.Ignite/Internal/Table/Schema.cs | 2 +
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 42 +--
.../Apache.Ignite/Internal/Table/TemporalTypes.cs | 20 ++
.../Apache.Ignite/Table/DataStreamerOptions.cs | 48 ++++
.../Apache.Ignite/Table/IDataStreamerTarget.cs | 39 +++
.../dotnet/Apache.Ignite/Table/IKeyValueView.cs | 2 +-
.../dotnet/Apache.Ignite/Table/IRecordView.cs | 2 +-
29 files changed, 964 insertions(+), 145 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
index 50b2e2bfb0..0c256c1385 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
@@ -17,12 +17,10 @@
namespace Apache.Ignite.Benchmarks;
-using System.Threading.Tasks;
+using BenchmarkDotNet.Running;
+using Table;
internal static class Program
{
- private static async Task Main()
- {
- await ManyConnectionsBenchmark.RunAsync();
- }
+ private static void Main() => BenchmarkRunner.Run<DataStreamerBenchmark>();
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
new file mode 100644
index 0000000000..7208629dd1
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Benchmarks.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Threading.Tasks;
+using BenchmarkDotNet.Attributes;
+using Ignite.Table;
+using Tests;
+
+/// <summary>
+/// Data streamer benchmark.
+/// <para />
+/// Server imitates work by doing Thread.Sleep based on the batch size, so
UpsertAll wins in single-server scenario because it
+/// inserts everything in one batch, and streamer sends multiple batches. With
multiple servers, streamer scales linearly because
+/// it sends batches to different nodes in parallel.
+/// <para />
+/// Results on i9-12900H, .NET SDK 6.0.408, Ubuntu 22.04:
+/// | Method | ServerCount | Mean | Error | StdDev | Ratio
| RatioSD | Allocated |
+/// |----------------- |------------
|----------:|---------:|---------:|------:|--------:|----------:|
+/// | DataStreamer | 1 | 141.56 ms | 2.725 ms | 3.244 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 1 | 112.99 ms | 1.203 ms | 1.125 ms | 0.80
| 0.02 | 4 MB |
+/// | UpsertAllBatched | 1 | 159.11 ms | 3.175 ms | 4.451 ms | 1.12
| 0.04 | 4 MB |
+/// | | | | | |
| | |
+/// | DataStreamer | 2 | 67.29 ms | 1.331 ms | 3.058 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 2 | 113.68 ms | 0.915 ms | 0.856 ms | 1.64
| 0.05 | 4 MB |
+/// | UpsertAllBatched | 2 | 162.47 ms | 3.169 ms | 5.118 ms | 2.42
| 0.14 | 4 MB |
+/// | | | | | |
| | |
+/// | DataStreamer | 4 | 32.64 ms | 0.507 ms | 0.475 ms | 1.00
| 0.00 | 4 MB |
+/// | UpsertAll | 4 | 113.84 ms | 1.276 ms | 1.193 ms | 3.49
| 0.05 | 4 MB |
+/// | UpsertAllBatched | 4 | 159.17 ms | 3.148 ms | 5.172 ms | 4.79
| 0.17 | 4 MB |.
+/// </summary>
+[MemoryDiagnoser]
+public class DataStreamerBenchmark
+{
+ private IList<FakeServer> _servers = null!;
+ private IIgniteClient _client = null!;
+ private ITable _table = null!;
+ private IReadOnlyList<IIgniteTuple> _data = null!;
+
+ [Params(1, 2, 4)]
+ [SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global",
Justification = "Benchmark parameter")]
+ public int ServerCount { get; set; }
+
+ [GlobalSetup]
+ public async Task GlobalSetup()
+ {
+ _servers = Enumerable.Range(0, ServerCount)
+ .Select(x => new FakeServer(disableOpsTracking: true, nodeName:
"server-" + x)
+ {
+ MultiRowOperationDelayPerRow =
TimeSpan.FromMilliseconds(0.001) // 1 ms per 1000 rows
+ })
+ .ToList();
+
+ // 17 partitions per node.
+ var partitionAssignment = Enumerable.Range(0, 17).SelectMany(_ =>
_servers.Select(x => x.Node.Id)).ToArray();
+
+ var cfg = new IgniteClientConfiguration();
+ foreach (var server in _servers)
+ {
+ cfg.Endpoints.Add(server.Endpoint);
+ server.PartitionAssignment = partitionAssignment;
+ }
+
+ _client = await IgniteClient.StartAsync(cfg);
+ _table = (await
_client.Tables.GetTableAsync(FakeServer.ExistingTableName))!;
+ _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple {
["id"] = x, ["name"] = "name " + x }).ToList();
+ }
+
+ [GlobalCleanup]
+ public void GlobalCleanup()
+ {
+ _client.Dispose();
+
+ foreach (var server in _servers)
+ {
+ server.Dispose();
+ }
+ }
+
+ [Benchmark(Baseline = true)]
+ public async Task DataStreamer() => await
_table.RecordBinaryView.StreamDataAsync(_data.ToAsyncEnumerable());
+
+ [Benchmark]
+ public async Task UpsertAll() => await
_table.RecordBinaryView.UpsertAllAsync(null, _data);
+
+ [Benchmark]
+ public async Task UpsertAllBatched()
+ {
+ var batchSize = DataStreamerOptions.Default.BatchSize;
+ var batch = new List<IIgniteTuple>(batchSize);
+
+ foreach (var tuple in _data)
+ {
+ batch.Add(tuple);
+
+ if (batch.Count == batchSize)
+ {
+ await _table.RecordBinaryView.UpsertAllAsync(null, batch);
+ batch.Clear();
+ }
+ }
+
+ if (batch.Count > 0)
+ {
+ await _table.RecordBinaryView.UpsertAllAsync(null, batch);
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
index e9e49300b5..be02cf68ce 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
@@ -44,7 +44,7 @@ namespace Apache.Ignite.Benchmarks.Table.Serialization
[nameof(Car.Seats)] = Object.Seats
};
- internal static readonly Schema Schema = new(1, 1, new[]
+ internal static readonly Schema Schema = new(1, 1, 1, new[]
{
new Column(nameof(Car.Id), ColumnType.Uuid, IsNullable: false,
IsColocation: true, IsKey: true, SchemaIndex: 0, Scale: 0, Precision: 0),
new Column(nameof(Car.BodyType), ColumnType.String, IsNullable:
false, IsColocation: false, IsKey: false, SchemaIndex: 1, Scale: 0, Precision:
0),
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index b621f433a1..9954a160ea 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -76,6 +76,12 @@ namespace Apache.Ignite.Tests
// No-op.
}
+ public FakeServer(bool disableOpsTracking, string nodeName =
"fake-server")
+ : this(null, nodeName, disableOpsTracking: disableOpsTracking)
+ {
+ // No-op.
+ }
+
internal FakeServer(
Func<RequestContext, bool>? shouldDropConnection = null,
string nodeName = "fake-server",
@@ -111,6 +117,8 @@ namespace Apache.Ignite.Tests
public TimeSpan OperationDelay { get; set; }
+ public TimeSpan MultiRowOperationDelayPerRow { get; set; }
+
public TimeSpan HeartbeatDelay { get; set; }
public int Port => ((IPEndPoint)_listener.LocalEndPoint!).Port;
@@ -125,6 +133,10 @@ namespace Apache.Ignite.Tests
public long? LastSqlTxId { get; set; }
+ public long UpsertAllRowCount { get; set; }
+
+ public long DroppedConnectionCount { get; set; }
+
public bool DropNewConnections
{
get => _dropNewConnections;
@@ -503,6 +515,7 @@ namespace Apache.Ignite.Tests
if (_shouldDropConnection(new
RequestContext(++requestCount, opCode, requestId)))
{
+ DroppedConnectionCount++;
break;
}
@@ -584,6 +597,19 @@ namespace Apache.Ignite.Tests
case ClientOp.TupleUpsertAll:
case ClientOp.TupleDeleteAll:
case ClientOp.TupleDeleteAllExact:
+ reader.Skip(3);
+ var count = reader.ReadInt32();
+
+ if (MultiRowOperationDelayPerRow > TimeSpan.Zero)
+ {
+ Thread.Sleep(MultiRowOperationDelayPerRow *
count);
+ }
+
+ if (opCode == ClientOp.TupleUpsertAll)
+ {
+ UpsertAllRowCount += count;
+ }
+
Send(handler, requestId, new byte[] { 1, 0
}.AsMemory());
continue;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
index 0efe83faf1..ea83cada88 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/HeartbeatTests.cs
@@ -31,7 +31,11 @@ namespace Apache.Ignite.Tests
{
var logger = new ListLogger();
- var cfg = new IgniteClientConfiguration(GetConfig()) { Logger =
logger };
+ var cfg = new IgniteClientConfiguration
+ {
+ Endpoints = { "127.0.0.1:" + ServerPort },
+ Logger = logger
+ };
using var client = await IgniteClient.StartAsync(cfg);
logger.Clear();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index cfd2f6c83a..b1d6809f2e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -133,7 +133,11 @@ namespace Apache.Ignite.Tests
protected static IgniteClientConfiguration GetConfig() => new()
{
- Endpoints = { "127.0.0.1:" + ServerNode.Port },
+ Endpoints =
+ {
+ "127.0.0.1:" + ServerNode.Port,
+ "127.0.0.1:" + (ServerNode.Port + 1)
+ },
Logger = new ConsoleLogger { MinLevel = LogLevel.Trace }
};
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
index 18ed80bb6d..e3d49bdaa9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
@@ -58,8 +58,8 @@ public class LoggingTests
StringAssert.Contains("Handshake succeeded", log);
StringAssert.Contains("Trying to establish secondary connections -
awaiting 2 tasks", log);
StringAssert.Contains("All secondary connections established", log);
- StringAssert.Contains("Sending request [opCode=3", log);
- StringAssert.Contains("Sending request [opCode=50", log);
+ StringAssert.Contains("Sending request [op=TablesGet", log);
+ StringAssert.Contains("Sending request [op=SqlExec", log);
StringAssert.Contains("Connection closed", log);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index fbfd8681d6..173f70b665 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Tests;
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Ignite.Table;
@@ -97,31 +98,14 @@ public class PartitionAwarenessTests
}
[Test]
- public async Task TestClientReceivesPartitionAssignmentUpdates()
- {
- using var client = await GetClient();
- var recordView = (await
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
-
- // Check default assignment.
- await recordView.UpsertAsync(null, 1);
- await AssertOpOnNode(() => recordView.UpsertAsync(null, 1),
ClientOp.TupleUpsert, _server2);
-
- // Update assignment.
- foreach (var server in new[] { _server1, _server2 })
- {
- server.ClearOps();
- server.PartitionAssignment =
server.PartitionAssignment.Reverse().ToArray();
- server.PartitionAssignmentChanged = true;
- }
-
- // First request on default node receives update flag.
- // Make two requests because balancing uses round-robin node.
- await client.Tables.GetTablesAsync();
- await client.Tables.GetTablesAsync();
+ public async Task TestClientReceivesPartitionAssignmentUpdates() =>
+ await TestClientReceivesPartitionAssignmentUpdates(view =>
view.UpsertAsync(null, 1), ClientOp.TupleUpsert);
- // Second request loads and uses new assignment.
- await AssertOpOnNode(() => recordView.UpsertAsync(null, 1),
ClientOp.TupleUpsert, _server1, allowExtraOps: true);
- }
+ [Test]
+ public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
+ await TestClientReceivesPartitionAssignmentUpdates(
+ view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
+ ClientOp.TupleUpsertAll);
[Test]
[TestCaseSource(nameof(KeyNodeCases))]
@@ -147,6 +131,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new
IgniteTuple { ["ID"] = keyId + 1 } };
@@ -180,6 +165,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.TupleUpsertAll, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
@@ -225,6 +211,7 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs),
ClientOp.TupleUpsertAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
+ await AssertOpOnNode(() =>
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.TupleUpsertAll,
expectedNode);
}
[Test]
@@ -252,6 +239,10 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => kvView.RemoveAsync(null, key),
ClientOp.TupleDelete, expectedNode);
await AssertOpOnNode(() => kvView.RemoveAsync(null, key, val),
ClientOp.TupleDeleteExact, expectedNode);
await AssertOpOnNode(() => kvView.ContainsAsync(null, key),
ClientOp.TupleContainsKey, expectedNode);
+ await AssertOpOnNode(
+ () => kvView.StreamDataAsync(new[] { new KeyValuePair<int,
int>(key, val) }.ToAsyncEnumerable()),
+ ClientOp.TupleUpsertAll,
+ expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
@@ -358,6 +349,32 @@ public class PartitionAwarenessTests
}
}
+ private async Task
TestClientReceivesPartitionAssignmentUpdates(Func<IRecordView<int>, Task> func,
ClientOp op)
+ {
+ using var client = await GetClient();
+ var recordView = (await
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
+
+ // Check default assignment.
+ await recordView.UpsertAsync(null, 1);
+ await AssertOpOnNode(() => func(recordView), op, _server2);
+
+ // Update assignment.
+ foreach (var server in new[] { _server1, _server2 })
+ {
+ server.ClearOps();
+ server.PartitionAssignment =
server.PartitionAssignment.Reverse().ToArray();
+ server.PartitionAssignmentChanged = true;
+ }
+
+ // First request on default node receives update flag.
+ // Make two requests because balancing uses round-robin node.
+ await client.Tables.GetTablesAsync();
+ await client.Tables.GetTablesAsync();
+
+ // Second request loads and uses new assignment.
+ await AssertOpOnNode(() => func(recordView), op, _server1,
allowExtraOps: true);
+ }
+
private async Task<IIgniteClient> GetClient()
{
var cfg = new IgniteClientConfiguration
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index aeb8e9f27b..4b97a4cf1d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -214,7 +214,7 @@ public class ColocationHashTests : IgniteTestsBase
{
var columns = arr.Select((obj, ci) => GetColumn(obj, ci,
timePrecision, timestampPrecision)).ToArray();
- return new Schema(Version: 0, arr.Count, columns);
+ return new Schema(Version: 0, 0, arr.Count, columns);
}
private static Column GetColumn(object value, int schemaIndex, int
timePrecision, int timestampPrecision)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
index 2a17f930a3..18818cc9a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/SslTests.cs
@@ -173,7 +173,7 @@ public class SslTests : IgniteTestsBase
cfg.SslStreamFactory = new NullSslStreamFactory();
using var client = await IgniteClient.StartAsync(cfg);
- Assert.IsNull(client.GetConnections().Single().SslInfo);
+ Assert.IsNull(client.GetConnections().First().SslInfo);
}
[Test]
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
new file mode 100644
index 0000000000..daf3b3f693
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Ignite.Table;
+using Internal.Proto;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="IDataStreamerTarget{T}.StreamDataAsync"/>.
+/// <para />
+/// See DataStreamer partition awareness tests in <see
cref="PartitionAwarenessTests"/>.
+/// </summary>
+public class DataStreamerTests : IgniteTestsBase
+{
+ private const int Count = 100;
+
+ [SetUp]
+ public async Task SetUp() =>
+ await TupleView.DeleteAllAsync(null, Enumerable.Range(0,
Count).Select(x => GetTuple(x)));
+
+ [Test]
+ public async Task TestBasicStreamingRecordBinaryView()
+ {
+ var options = DataStreamerOptions.Default with { BatchSize = 10 };
+ var data = Enumerable.Range(0, Count).Select(x => GetTuple(x, "t" +
x)).ToList();
+
+ await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+ await CheckData();
+ }
+
+ [Test]
+ public async Task TestBasicStreamingRecordView()
+ {
+ var options = DataStreamerOptions.Default with { BatchSize = 5 };
+ var data = Enumerable.Range(0, Count).Select(x => GetPoco(x, "t" +
x)).ToList();
+
+ await
Table.GetRecordView<Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
+ await CheckData();
+ }
+
+ [Test]
+ public async Task TestBasicStreamingKeyValueBinaryView()
+ {
+ var options = DataStreamerOptions.Default with { BatchSize = 10_000 };
+ var data = Enumerable.Range(0, Count)
+ .Select(x => new KeyValuePair<IIgniteTuple,
IIgniteTuple>(GetTuple(x), GetTuple(x, "t" + x)))
+ .ToList();
+
+ await
Table.KeyValueBinaryView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+ await CheckData();
+ }
+
+ [Test]
+ public async Task TestBasicStreamingKeyValueView()
+ {
+ var options = DataStreamerOptions.Default with { BatchSize = 1 };
+ var data = Enumerable.Range(0, Count)
+ .Select(x => new KeyValuePair<long, Poco>(x, GetPoco(x, "t" + x)))
+ .ToList();
+
+ await Table.GetKeyValueView<long,
Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
+ await CheckData();
+ }
+
+ [Test]
+ public async Task TestAutoFlushFrequency([Values(true, false)] bool
enabled)
+ {
+ using var cts = new CancellationTokenSource();
+
+ _ = TupleView.StreamDataAsync(
+ GetTuplesWithDelay(cts.Token),
+ new()
+ {
+ AutoFlushFrequency = enabled
+ ? TimeSpan.FromMilliseconds(50)
+ : TimeSpan.MaxValue
+ });
+
+ await Task.Delay(100);
+
+ Assert.AreEqual(enabled, await TupleView.ContainsKeyAsync(null,
GetTuple(0)));
+ Assert.IsFalse(await TupleView.ContainsKeyAsync(null, GetTuple(1)));
+
+ cts.Cancel();
+ }
+
+ [Test]
+ public async Task TestCancellation()
+ {
+ using var cts = new CancellationTokenSource();
+ var streamTask = TupleView.StreamDataAsync(GetTuplesWithDelay(),
cancellationToken: cts.Token);
+
+ cts.Cancel();
+ Assert.ThrowsAsync<TaskCanceledException>(async () => await
streamTask);
+
+ Assert.IsFalse(
+ await TupleView.ContainsKeyAsync(null, GetTuple(0)),
+ "No data was streamed - cancelled before any batches were full.");
+ }
+
+ [Test]
+ public void TestOptionsValidation()
+ {
+ AssertException(DataStreamerOptions.Default with { BatchSize = -10 },
"BatchSize should be positive.");
+ AssertException(DataStreamerOptions.Default with { RetryLimit = -1 },
"RetryLimit should be non-negative.");
+ AssertException(
+ DataStreamerOptions.Default with { AutoFlushFrequency =
TimeSpan.FromDays(-1) },
+ "AutoFlushFrequency should be positive.");
+
+ void AssertException(DataStreamerOptions options, string message)
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
options));
+
+ StringAssert.Contains(message, ex?.Message);
+ }
+ }
+
+ [Test]
+ public async Task TestRetryLimitExhausted()
+ {
+ using var server = new FakeServer(
+ shouldDropConnection: ctx => ctx is { OpCode:
ClientOp.TupleUpsertAll, RequestCount: > 7 });
+
+ using var client = await server.ConnectClientAsync();
+ var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+ var ex = Assert.ThrowsAsync<IgniteClientConnectionException>(
+ async () => await
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(10_000)));
+
+ StringAssert.StartsWith("Operation TupleUpsertAll failed after 16
retries", ex!.Message);
+ }
+
+ [Test]
+ public async Task TestManyItemsWithDisconnectAndRetry()
+ {
+ const int count = 100_000;
+ int upsertIdx = 0;
+
+ using var server = new FakeServer(
+ shouldDropConnection: ctx => ctx.OpCode == ClientOp.TupleUpsertAll
&& Interlocked.Increment(ref upsertIdx) % 2 == 1);
+
+ // Streamer has it's own retry policy, so we can disable retries on
the client.
+ using var client = await server.ConnectClientAsync(new
IgniteClientConfiguration
+ {
+ RetryPolicy = new RetryNonePolicy()
+ });
+
+ var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ await
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
+
+ Assert.AreEqual(count, server.UpsertAllRowCount);
+ Assert.AreEqual(count / DataStreamerOptions.Default.BatchSize,
server.DroppedConnectionCount);
+ }
+
+ private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int
count)
+ {
+ for (var i = 0; i < count; i++)
+ {
+ yield return new IgniteTuple { ["ID"] = i };
+ await Task.Yield();
+ }
+ }
+
+ private static async IAsyncEnumerable<IIgniteTuple>
GetTuplesWithDelay([EnumeratorCancellation] CancellationToken ct = default)
+ {
+ for (var i = 0; i < 3; i++)
+ {
+ yield return GetTuple(i, "t" + i);
+ await Task.Delay(15000, ct);
+ }
+ }
+
+ private async Task CheckData()
+ {
+ var data = Enumerable.Range(0, Count).Select(x => GetTuple(x));
+ var res = await TupleView.GetAllAsync(null, data);
+
+ Assert.AreEqual(Count, res.Count);
+
+ foreach (var (_, hasVal) in res)
+ {
+ Assert.IsTrue(hasVal);
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
index f1d60d0479..4ee9a143fb 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
@@ -32,7 +32,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
// ReSharper disable NotAccessedPositionalProperty.Local
public class ObjectSerializerHandlerTests
{
- private static readonly Schema Schema = new(1, 1, new[]
+ private static readonly Schema Schema = new(1, 1, 1, new[]
{
new Column("Key", ColumnType.Int64, IsNullable: false,
IsColocation: true, IsKey: true, SchemaIndex: 0, Scale: 0, Precision: 0),
new Column("Val", ColumnType.String, IsNullable: false,
IsColocation: false, IsKey: false, SchemaIndex: 1, Scale: 0, Precision: 0)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index ac85e20f77..fb499064bf 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -266,9 +266,12 @@ namespace Apache.Ignite.Tests.Transactions
[Test]
public async Task TestToString()
{
- await using var tx1 = await Client.Transactions.BeginAsync();
- await using var tx2 = await
Client.Transactions.BeginAsync(new(ReadOnly: true));
- await using var tx3 = await Client.Transactions.BeginAsync();
+ // Single connection.
+ using var client = await IgniteClient.StartAsync(new() { Endpoints
= { "127.0.0.1:" + ServerPort } });
+
+ await using var tx1 = await client.Transactions.BeginAsync();
+ await using var tx2 = await
client.Transactions.BeginAsync(new(ReadOnly: true));
+ await using var tx3 = await client.Transactions.BeginAsync();
await tx2.RollbackAsync();
await tx3.CommitAsync();
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 32ff848e54..1fd36d60d7 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -207,10 +207,14 @@ namespace Apache.Ignite.Internal.Buffers
/// </summary>
/// <param name="val">Value.</param>
/// <param name="pos">Position.</param>
- public void WriteInt(int val, int pos)
- {
- BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(pos +
_prefixSize), val);
- }
+ public void WriteInt(int val, int pos) =>
BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize), val);
+
+ /// <summary>
+ /// Writes an int at specified position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteIntBigEndian(int val, int pos) =>
BinaryPrimitives.WriteInt32BigEndian(_buffer.AsSpan(pos + _prefixSize), val);
/// <summary>
/// Writes a long at current position.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 33e5cc3d75..8140c0da54 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -157,12 +157,14 @@ namespace Apache.Ignite.Internal
/// <param name="tx">Transaction.</param>
/// <param name="request">Request data.</param>
/// <param name="preferredNode">Preferred node.</param>
+ /// <param name="retryPolicyOverride">Retry policy.</param>
/// <returns>Response data and socket.</returns>
public async Task<(PooledBuffer Buffer, ClientSocket Socket)>
DoOutInOpAndGetSocketAsync(
ClientOp clientOp,
Transaction? tx = null,
PooledArrayBuffer? request = null,
- PreferredNode preferredNode = default)
+ PreferredNode preferredNode = default,
+ IRetryPolicy? retryPolicyOverride = null)
{
if (tx != null)
{
@@ -194,7 +196,7 @@ namespace Apache.Ignite.Internal
// Preferred node connection may not be available, do not
use it after first failure.
preferredNode = default;
- if (!HandleOpError(e, clientOp, ref attempt, ref errors))
+ if (!HandleOpError(e, clientOp, ref attempt, ref errors,
retryPolicyOverride ?? Configuration.RetryPolicy))
{
throw;
}
@@ -514,10 +516,11 @@ namespace Apache.Ignite.Internal
/// <param name="exception">Exception that caused the operation to
fail.</param>
/// <param name="op">Operation code.</param>
/// <param name="attempt">Current attempt.</param>
+ /// <param name="retryPolicy">Retry policy.</param>
/// <returns>
/// <c>true</c> if the operation should be retried on another
connection, <c>false</c> otherwise.
/// </returns>
- private bool ShouldRetry(Exception exception, ClientOp op, int attempt)
+ private bool ShouldRetry(Exception exception, ClientOp op, int
attempt, IRetryPolicy? retryPolicy)
{
var e = exception;
@@ -532,7 +535,7 @@ namespace Apache.Ignite.Internal
return false;
}
- if (Configuration.RetryPolicy is null or RetryNonePolicy)
+ if (retryPolicy is null or RetryNonePolicy)
{
return false;
}
@@ -547,7 +550,7 @@ namespace Apache.Ignite.Internal
var ctx = new RetryPolicyContext(new(Configuration),
publicOpType.Value, attempt, exception);
- return Configuration.RetryPolicy.ShouldRetry(ctx);
+ return retryPolicy.ShouldRetry(ctx);
}
/// <summary>
@@ -557,15 +560,17 @@ namespace Apache.Ignite.Internal
/// <param name="op">Operation code.</param>
/// <param name="attempt">Current attempt.</param>
/// <param name="errors">Previous errors.</param>
+ /// <param name="retryPolicy">Retry policy.</param>
/// <returns>True if the error was handled, false otherwise.</returns>
[SuppressMessage("Microsoft.Design", "CA1002:DoNotExposeGenericLists",
Justification = "Private.")]
private bool HandleOpError(
Exception exception,
ClientOp op,
ref int attempt,
- ref List<Exception>? errors)
+ ref List<Exception>? errors,
+ IRetryPolicy? retryPolicy)
{
- if (!ShouldRetry(exception, op, attempt))
+ if (!ShouldRetry(exception, op, attempt, retryPolicy))
{
if (_logger?.IsEnabled(LogLevel.Debug) == true)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 749e250575..42dd3f8659 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -582,8 +582,7 @@ namespace Apache.Ignite.Internal
if (_logger?.IsEnabled(LogLevel.Trace) == true)
{
- _logger.Trace(
- $"Sending request [opCode={(int)op},
remoteAddress={ConnectionContext.ClusterNode.Address}, requestId={requestId}]");
+ _logger.Trace($"Sending request [op={op},
remoteAddress={ConnectionContext.ClusterNode.Address}, requestId={requestId}]");
}
await
_sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
index 29dc6d64b9..a97c4eae67 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/IgniteArgumentCheck.cs
@@ -33,9 +33,13 @@ namespace Apache.Ignite.Internal.Common
/// <param name="arg">The argument.</param>
/// <param name="argName">Name of the argument.</param>
/// <typeparam name="T">Arg type.</typeparam>
- /// <returns>Argument.</returns>
- public static T NotNull<T>([NoEnumeration] T arg,
[CallerArgumentExpression("arg")] string? argName = null) =>
- arg == null ? throw new ArgumentNullException(argName) : arg;
+ public static void NotNull<T>([NoEnumeration] T arg,
[CallerArgumentExpression("arg")] string? argName = null)
+ {
+ if (arg == null)
+ {
+ throw new ArgumentNullException(argName);
+ }
+ }
/// <summary>
/// Throws an ArgumentException if specified arg is null or empty
string.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 85535194af..15314d5e0e 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
using System.Collections;
using System.Diagnostics;
using System.Numerics;
- using System.Runtime.InteropServices;
using Buffers;
using Ignite.Sql;
using NodaTime;
@@ -482,8 +481,10 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
if (ShouldHash())
{
- _hash =
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[..8]), _hash);
- _hash =
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[8..]), _hash);
+ var lo = BinaryPrimitives.ReadInt64LittleEndian(span[..8]);
+ var hi = BinaryPrimitives.ReadInt64LittleEndian(span[8..]);
+
+ _hash = HashUtils.Hash32(hi, HashUtils.Hash32(lo, _hash));
}
}
else
@@ -571,24 +572,8 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// </summary>
/// <param name="value">Value.</param>
/// <param name="scale">Decimal scale from schema.</param>
- public void AppendDecimal(decimal value, int scale)
- {
- if (value != decimal.Zero)
- {
- var (unscaledValue, valueScale) = DeconstructDecimal(value);
-
- PutDecimal(scale, unscaledValue, valueScale);
- }
- else
- {
- if (ShouldHash())
- {
- _hash = HashUtils.Hash32(stackalloc byte[1] { 0 }, _hash);
- }
- }
-
- OnWrite();
- }
+ public void AppendDecimal(decimal value, int scale) =>
+ AppendNumber(BinaryTupleCommon.DecimalToUnscaledBigInteger(value,
scale));
/// <summary>
/// Appends a decimal.
@@ -1145,20 +1130,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
_buffer.Dispose();
}
- private static (BigInteger Unscaled, int Scale)
DeconstructDecimal(decimal value)
- {
- Span<int> bits = stackalloc int[4];
- decimal.GetBits(value, bits);
-
- var scale = (bits[3] & 0x00FF0000) >> 16;
- var sign = bits[3] >> 31;
-
- var bytes = MemoryMarshal.Cast<int, byte>(bits[..3]);
- var unscaled = new BigInteger(bytes, true);
-
- return (sign < 0 ? -unscaled : unscaled, scale);
- }
-
private static int GetDecimalScale(decimal value)
{
Span<int> bits = stackalloc int[4];
@@ -1167,30 +1138,6 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
return (bits[3] & 0x00FF0000) >> 16;
}
- private void PutDecimal(int scale, BigInteger unscaledValue, int
valueScale)
- {
- if (scale > valueScale)
- {
- unscaledValue *= BigInteger.Pow(new BigInteger(10), scale -
valueScale);
- }
- else if (scale < valueScale)
- {
- unscaledValue /= BigInteger.Pow(new BigInteger(10), valueScale
- scale);
- }
-
- var size = unscaledValue.GetByteCount();
- var destination = GetSpan(size);
- var success = unscaledValue.TryWriteBytes(destination, out int
written, isBigEndian: true);
-
- if (ShouldHash())
- {
- _hash = HashUtils.Hash32(destination[..written], _hash);
- }
-
- Debug.Assert(success, "success");
- Debug.Assert(written == size, "written == size");
- }
-
private void PutByte(sbyte value) =>
_buffer.WriteByte(unchecked((byte)value));
private void PutShort(short value) => _buffer.WriteShort(value);
@@ -1232,13 +1179,7 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
private (long Seconds, int Nanos) PutTimestamp(Instant value, int
precision)
{
- // Logic taken from
- //
https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L69
- // (Apache License).
- // See discussion:
https://github.com/nodatime/nodatime/issues/1644#issuecomment-1260524451
- long seconds = value.ToUnixTimeSeconds();
- Duration remainder = value - Instant.FromUnixTimeSeconds(seconds);
- int nanos =
TemporalTypes.NormalizeNanos((int)remainder.NanosecondOfDay, precision);
+ var (seconds, nanos) = value.ToSecondsAndNanos(precision);
PutLong(seconds);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
index 23b45a4a11..ff094582a9 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
@@ -17,7 +17,10 @@
namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
+ using System;
using System.Diagnostics;
+ using System.Numerics;
+ using System.Runtime.InteropServices;
using NodaTime;
/// <summary>
@@ -111,5 +114,44 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
return (byte)(1 << (index % 8));
}
+
+ /// <summary>
+ /// Converts decimal to unscaled BigInteger.
+ /// </summary>
+ /// <param name="value">Decimal value.</param>
+ /// <param name="scale">Column scale.</param>
+ /// <returns>Unscaled BigInteger according to column scale.</returns>
+ public static BigInteger DecimalToUnscaledBigInteger(decimal value,
int scale)
+ {
+ if (value == decimal.Zero)
+ {
+ return BigInteger.Zero;
+ }
+
+ Span<int> bits = stackalloc int[4];
+ decimal.GetBits(value, bits);
+
+ var valueScale = (bits[3] & 0x00FF0000) >> 16;
+ var sign = bits[3] >> 31;
+
+ var bytes = MemoryMarshal.Cast<int, byte>(bits[..3]);
+ var unscaled = new BigInteger(bytes, true);
+
+ if (sign < 0)
+ {
+ unscaled = -unscaled;
+ }
+
+ if (scale > valueScale)
+ {
+ unscaled *= BigInteger.Pow(new BigInteger(10), scale -
valueScale);
+ }
+ else if (scale < valueScale)
+ {
+ unscaled /= BigInteger.Pow(new BigInteger(10), valueScale -
scale);
+ }
+
+ return unscaled;
+ }
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
new file mode 100644
index 0000000000..9497885965
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Buffers;
+using Common;
+using Ignite.Table;
+using Proto;
+using Proto.BinaryTuple;
+using Proto.MsgPack;
+using Serialization;
+
+/// <summary>
+/// Data streamer.
+/// <para />
+/// Implementation notes:
+/// * Hashing is combined with serialization (unlike Java client), so we write
binary tuples to a per-node buffer right away.
+/// - Pros: cheaper happy path;
+/// - Cons: will require re-serialization on schema update.
+/// * Iteration and serialization are sequential.
+/// * Batches are sent asynchronously; we wait for the previous batch only
when the next batch for the given node is full.
+/// * The more connections to different servers we have - the more parallelism
we get (see benchmark).
+/// * There is no parallelism for the same node, because we need to guarantee
ordering.
+/// </summary>
+internal static class DataStreamer
+{
+ private static readonly TimeSpan PartitionAssignmentUpdateFrequency =
TimeSpan.FromSeconds(15);
+
+ /// <summary>
+ /// Streams the data.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <param name="sender">Batch sender.</param>
+ /// <param name="writer">Item writer.</param>
+ /// <param name="schemaProvider">Schema provider.</param>
+ /// <param name="partitionAssignmentProvider">Partitioner.</param>
+ /// <param name="options">Options.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
+ /// <typeparam name="T">Element type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ internal static async Task StreamDataAsync<T>(
+ IAsyncEnumerable<T> data,
+ Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
+ IRecordSerializerHandler<T> writer,
+ Func<Task<Schema>> schemaProvider,
+ Func<ValueTask<string[]?>> partitionAssignmentProvider,
+ DataStreamerOptions options,
+ CancellationToken cancellationToken)
+ {
+ IgniteArgumentCheck.NotNull(data);
+
+ IgniteArgumentCheck.Ensure(options.BatchSize > 0,
$"{nameof(options.BatchSize)} should be positive.");
+ IgniteArgumentCheck.Ensure(options.AutoFlushFrequency > TimeSpan.Zero,
$"{nameof(options.AutoFlushFrequency)} should be positive.");
+ IgniteArgumentCheck.Ensure(options.RetryLimit >= 0,
$"{nameof(options.RetryLimit)} should be non-negative.");
+
+ // ConcurrentDictionary is not necessary because we consume the source
sequentially.
+ // However, locking for batches is required due to auto-flush
background task.
+ var batches = new Dictionary<string, Batch>();
+ var retryPolicy = new RetryLimitPolicy { RetryLimit =
options.RetryLimit };
+
+ // TODO: IGNITE-19710 Data Streamer schema synchronization
+ var schema = await schemaProvider().ConfigureAwait(false);
+ var partitionAssignment = await
partitionAssignmentProvider().ConfigureAwait(false);
+ var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
+ using var flushCts = new CancellationTokenSource();
+
+ try
+ {
+ _ = AutoFlushAsync(flushCts.Token);
+
+ await foreach (var item in
data.WithCancellation(cancellationToken))
+ {
+ var (batch, partition) = Add(item);
+
+ if (batch.Count >= options.BatchSize)
+ {
+ await SendAsync(batch, partition).ConfigureAwait(false);
+ }
+
+ if (lastPartitionsAssignmentCheck.Elapsed >
PartitionAssignmentUpdateFrequency)
+ {
+ var newAssignment = await
partitionAssignmentProvider().ConfigureAwait(false);
+
+ if (newAssignment != partitionAssignment)
+ {
+ // Drain all batches to preserve order when partition
assignment changes.
+ await Drain().ConfigureAwait(false);
+ partitionAssignment = newAssignment;
+ }
+
+ lastPartitionsAssignmentCheck.Restart();
+ }
+ }
+
+ await Drain().ConfigureAwait(false);
+ }
+ finally
+ {
+ flushCts.Cancel();
+ foreach (var batch in batches.Values)
+ {
+ batch.Buffer.Dispose();
+ }
+ }
+
+ (Batch Batch, string Partition) Add(T item)
+ {
+ var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count,
hashedColumnsPredicate: schema);
+
+ try
+ {
+ return Add0(item, ref tupleBuilder);
+ }
+ finally
+ {
+ tupleBuilder.Dispose();
+ }
+ }
+
+ (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder)
+ {
+ var columnCount = schema.Columns.Count;
+
+ // Use MemoryMarshal to work around [CS8352]: "Cannot use variable
'noValueSet' in this context
+ // because it may expose referenced variables outside of their
declaration scope".
+ Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
+ Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref
MemoryMarshal.GetReference(noValueSet), columnCount);
+
+ writer.Write(ref tupleBuilder, item, schema, columnCount,
noValueSetRef);
+
+ var partition = partitionAssignment == null
+ ? string.Empty // Default connection.
+ : partitionAssignment[Math.Abs(tupleBuilder.Hash %
partitionAssignment.Length)];
+
+ var batch = GetOrCreateBatch(partition);
+
+ lock (batch)
+ {
+ batch.Count++;
+
+
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
+ batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+ }
+
+ return (batch, partition);
+ }
+
+ Batch GetOrCreateBatch(string partition)
+ {
+ ref var batchRef = ref
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
+
+ if (batchRef == null)
+ {
+ batchRef = new Batch();
+ InitBuffer(batchRef);
+ }
+
+ return batchRef;
+ }
+
+ async Task SendAsync(Batch batch, string partition)
+ {
+ var expectedSize = batch.Count;
+
+ // Wait for the previous task for this batch to finish: preserve
order, backpressure control.
+ await batch.Task.ConfigureAwait(false);
+
+ lock (batch)
+ {
+ if (batch.Count != expectedSize || batch.Count == 0)
+ {
+ // Concurrent update happened.
+ return;
+ }
+
+ var buf = batch.Buffer;
+
+ // See RecordSerializer.WriteMultiple.
+ buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
+ buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
+
+ batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task);
+
+ batch.Count = 0;
+ batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
+ InitBuffer(batch);
+ batch.LastFlush = Stopwatch.GetTimestamp();
+ }
+ }
+
+ async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string
partition, Task oldTask)
+ {
+ try
+ {
+ // Wait for the previous batch for this node to preserve item
order.
+ await oldTask.ConfigureAwait(false);
+ await sender(buf, partition,
retryPolicy).ConfigureAwait(false);
+ }
+ finally
+ {
+ buf.Dispose();
+ }
+ }
+
+ async Task AutoFlushAsync(CancellationToken flushCt)
+ {
+ while (!flushCt.IsCancellationRequested)
+ {
+ await Task.Delay(options.AutoFlushFrequency,
flushCt).ConfigureAwait(false);
+ var ts = Stopwatch.GetTimestamp();
+
+ foreach (var (partition, batch) in batches)
+ {
+ if (batch.Count > 0 && ts - batch.LastFlush >
options.AutoFlushFrequency.Ticks)
+ {
+ await SendAsync(batch,
partition).ConfigureAwait(false);
+ }
+ }
+ }
+ }
+
+ void InitBuffer(Batch batch)
+ {
+ var buf = batch.Buffer;
+
+ var w = buf.MessageWriter;
+ w.Write(schema.TableId);
+ w.WriteTx(null);
+ w.Write(schema.Version);
+
+ batch.CountPos = buf.Position;
+ buf.Advance(5); // Reserve count.
+ }
+
+ async Task Drain()
+ {
+ foreach (var (partition, batch) in batches)
+ {
+ if (batch.Count > 0)
+ {
+ await SendAsync(batch, partition).ConfigureAwait(false);
+ }
+
+ await batch.Task.ConfigureAwait(false);
+ }
+ }
+ }
+
+ private sealed record Batch
+ {
+ public PooledArrayBuffer Buffer { get; set; } =
ProtoCommon.GetMessageWriter();
+
+ public int Count { get; set; }
+
+ public int CountPos { get; set; }
+
+ public Task Task { get; set; } = Task.CompletedTask; // Task for the
previous buffer.
+
+ public long LastFlush { get; set; }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 70efdd5abb..c3174184e8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Table;
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Transactions;
using Common;
@@ -156,6 +157,13 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
return new IgniteQueryable<KeyValuePair<TK, TV>>(provider);
}
+ /// <inheritdoc/>
+ public async Task StreamDataAsync(
+ IAsyncEnumerable<KeyValuePair<TK, TV>> data,
+ DataStreamerOptions? options = null,
+ CancellationToken cancellationToken = default) =>
+ await _recordView.StreamDataAsync(ToKv(data), options,
cancellationToken).ConfigureAwait(false);
+
/// <inheritdoc/>
public override string ToString() =>
new IgniteToStringBuilder(GetType())
@@ -184,4 +192,12 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
return new(k, v);
}
+
+ private static async IAsyncEnumerable<KvPair<TK, TV>>
ToKv(IAsyncEnumerable<KeyValuePair<TK, TV>> pairs)
+ {
+ await foreach (var pair in pairs)
+ {
+ yield return ToKv(pair);
+ }
+ }
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 6e4f7c1620..ee622d3933 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Table
using System;
using System.Collections.Generic;
using System.Linq;
+ using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Common;
@@ -323,6 +324,29 @@ namespace Apache.Ignite.Internal.Table
public async Task<IList<T>> DeleteAllExactAsync(ITransaction?
transaction, IEnumerable<T> records) =>
await DeleteAllAsync(transaction, records, exact:
true).ConfigureAwait(false);
+ /// <inheritdoc/>
+ public async Task StreamDataAsync(
+ IAsyncEnumerable<T> data,
+ DataStreamerOptions? options = null,
+ CancellationToken cancellationToken = default) =>
+ await DataStreamer.StreamDataAsync(
+ data,
+ sender: async (batch, preferredNode, retryPolicy) =>
+ {
+ using var resBuf = await DoOutInOpAsync(
+ ClientOp.TupleUpsertAll,
+ tx: null,
+ batch,
+ PreferredNode.FromId(preferredNode),
+ retryPolicy)
+ .ConfigureAwait(false);
+ },
+ writer: _ser.Handler,
+ schemaProvider: () => _table.GetLatestSchemaAsync(),
+ partitionAssignmentProvider: () =>
_table.GetPartitionAssignmentAsync(),
+ options ?? DataStreamerOptions.Default,
+ cancellationToken).ConfigureAwait(false);
+
/// <inheritdoc/>
public override string ToString() =>
new IgniteToStringBuilder(GetType())
@@ -411,9 +435,11 @@ namespace Apache.Ignite.Internal.Table
ClientOp clientOp,
Transaction? tx,
PooledArrayBuffer? request = null,
- PreferredNode preferredNode = default)
+ PreferredNode preferredNode = default,
+ IRetryPolicy? retryPolicyOverride = null)
{
- var (buf, _) = await
_table.Socket.DoOutInOpAndGetSocketAsync(clientOp, tx, request,
preferredNode).ConfigureAwait(false);
+ var (buf, _) = await
_table.Socket.DoOutInOpAndGetSocketAsync(clientOp, tx, request, preferredNode,
retryPolicyOverride)
+ .ConfigureAwait(false);
return buf;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
index 65deb37bba..c4bf0fac06 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
@@ -24,10 +24,12 @@ namespace Apache.Ignite.Internal.Table
/// Schema.
/// </summary>
/// <param name="Version">Version.</param>
+ /// <param name="TableId">Table id.</param>
/// <param name="KeyColumnCount">Key column count.</param>
/// <param name="Columns">Columns in schema order.</param>
internal sealed record Schema(
int Version,
+ int TableId,
int KeyColumnCount,
IReadOnlyList<Column> Columns) : IHashedColumnIndexProvider
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 35d0eaf926..37d92b2972 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -205,24 +205,11 @@ namespace Apache.Ignite.Internal.Table
return PreferredNode.FromId(nodeId);
}
- private Task<Schema> GetCachedSchemaAsync(int version)
- {
- var task = GetOrAdd();
-
- if (!task.IsFaulted)
- {
- return task;
- }
-
- // Do not return failed task. Remove it from the cache and try
again.
- _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version,
task));
-
- return GetOrAdd();
-
- Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver,
tbl) => tbl.LoadSchemaAsync(ver), this);
- }
-
- private async ValueTask<string[]?> GetPartitionAssignmentAsync()
+ /// <summary>
+ /// Gets the partition assignment.
+ /// </summary>
+ /// <returns>Partition assignment.</returns>
+ internal async ValueTask<string[]?> GetPartitionAssignmentAsync()
{
var socketVer = _socket.PartitionAssignmentVersion;
var assignment = _partitionAssignment;
@@ -258,6 +245,23 @@ namespace Apache.Ignite.Internal.Table
}
}
+ private Task<Schema> GetCachedSchemaAsync(int version)
+ {
+ var task = GetOrAdd();
+
+ if (!task.IsFaulted)
+ {
+ return task;
+ }
+
+ // Do not return failed task. Remove it from the cache and try
again.
+ _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version,
task));
+
+ return GetOrAdd();
+
+ Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver,
tbl) => tbl.LoadSchemaAsync(ver), this);
+ }
+
/// <summary>
/// Loads the schema.
/// </summary>
@@ -349,7 +353,7 @@ namespace Apache.Ignite.Internal.Table
}
}
- var schema = new Schema(schemaVersion, keyColumnCount, columns);
+ var schema = new Schema(schemaVersion, Id, keyColumnCount,
columns);
_schemas[schemaVersion] = Task.FromResult(schema);
if (_logger?.IsEnabled(LogLevel.Debug) == true)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
index db89c2362f..f827cd98e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/TemporalTypes.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal.Table;
using System;
using Ignite.Sql;
+using NodaTime;
/// <summary>
/// Temporal type utils.
@@ -51,4 +52,23 @@ internal static class TemporalTypes
9 => nanos, // 1ns precision
_ => throw new ArgumentException("Unsupported fractional seconds
precision: " + precision)
};
+
+ /// <summary>
+ /// Deconstructs Instant into seconds and nanoseconds.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ /// <param name="precision">Column precision.</param>
+ /// <returns>Seconds and nanos.</returns>
+ public static (long Seconds, int Nanos) ToSecondsAndNanos(this Instant
value, int precision)
+ {
+ // Logic taken from
+ //
https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L69
+ // (Apache License).
+ // See discussion:
https://github.com/nodatime/nodatime/issues/1644#issuecomment-1260524451
+ long seconds = value.ToUnixTimeSeconds();
+ Duration remainder = value - Instant.FromUnixTimeSeconds(seconds);
+ int nanos = NormalizeNanos((int)remainder.NanosecondOfDay, precision);
+
+ return (seconds, nanos);
+ }
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs
new file mode 100644
index 0000000000..cc99ed364b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOptions.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+
+/// <summary>
+/// Data streamer options.
+/// </summary>
+/// <param name="BatchSize">Batch size - the number of entries that will be
sent to the cluster in one network call.</param>
+/// <param name="RetryLimit">Retry limit for a batch. If a batch fails to be
sent to the cluster,
+/// the streamer will retry it a number of times.</param>
+/// <param name="AutoFlushFrequency">Auto flush frequency - the period of time
after which the streamer
+/// will flush the per-node buffer even if it is not full.</param>
+public sealed record DataStreamerOptions(int BatchSize, int RetryLimit,
TimeSpan AutoFlushFrequency)
+{
+ /// <summary>
+ /// Default streamer options.
+ /// </summary>
+ public static readonly DataStreamerOptions Default = new();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="DataStreamerOptions"/>
class.
+ /// </summary>
+ public DataStreamerOptions()
+ : this(
+ BatchSize: 1000,
+ RetryLimit: 16,
+ AutoFlushFrequency: TimeSpan.FromSeconds(5))
+ {
+ // No-op.
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
new file mode 100644
index 0000000000..899be69f20
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable TypeParameterCanBeVariant (justification: future
compatibility for streamer with receiver).
+namespace Apache.Ignite.Table;
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+/// <summary>
+/// Represents an entity that can be used as a target for streaming data.
+/// </summary>
+/// <typeparam name="T">Data type.</typeparam>
+public interface IDataStreamerTarget<T>
+{
+ /// <summary>
+ /// Streams data into the underlying table.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <param name="options">Streamer options.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task StreamDataAsync(IAsyncEnumerable<T> data, DataStreamerOptions?
options = null, CancellationToken cancellationToken = default);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
index 5b60e08fae..4a8b94decf 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IKeyValueView.cs
@@ -28,7 +28,7 @@ using Transactions;
/// </summary>
/// <typeparam name="TK">Key type.</typeparam>
/// <typeparam name="TV">Value type.</typeparam>
-public interface IKeyValueView<TK, TV>
+public interface IKeyValueView<TK, TV> : IDataStreamerTarget<KeyValuePair<TK,
TV>>
where TK : notnull
where TV : notnull
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
index 83b17763b6..83ecf36ee8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IRecordView.cs
@@ -27,7 +27,7 @@ namespace Apache.Ignite.Table
/// Record view interface provides methods to access table records.
/// </summary>
/// <typeparam name="T">Record type.</typeparam>
- public interface IRecordView<T>
+ public interface IRecordView<T> : IDataStreamerTarget<T>
where T : notnull
{
/// <summary>