This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5ac19cfb95 IGNITE-19710 .NET: Add Data Streamer schema synchronization
(#2548)
5ac19cfb95 is described below
commit 5ac19cfb9528ec2a72edd1e5a1ff3d03f24b4537
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Sep 5 17:26:14 2023 +0300
IGNITE-19710 .NET: Add Data Streamer schema synchronization (#2548)
Add support for schema changes in `DataStreamer`:
* Retry outdated schema error
* Retry unmapped columns error
* Keep original rows (tuples or pocos) along with the serialized data,
re-serialize on retry using new schema
* Perf: `DataStreamerBenchmark` results not affected (pooled arrays don't
increase memory usage, and the overhead to fill them is not noticeable)
---
.../dotnet/Apache.Ignite.Benchmarks/Program.cs | 4 +-
.../Table/DataStreamerBenchmark.cs | 2 +-
.../Table/SchemaSynchronizationTest.cs | 68 +++++++++
.../Internal/Buffers/PooledArrayBuffer.cs | 29 +---
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 158 +++++++++++++++++----
.../Apache.Ignite/Internal/Table/RecordView.cs | 4 +-
.../Table/Serialization/ObjectSerializerHandler.cs | 2 +-
.../Table/Serialization/RecordSerializer.cs | 24 +++-
.../Serialization/TuplePairSerializerHandler.cs | 2 +-
.../Table/Serialization/TupleSerializerHandler.cs | 2 +-
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 12 --
11 files changed, 233 insertions(+), 74 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
index 366dde3ad2..0c256c1385 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
@@ -18,9 +18,9 @@
namespace Apache.Ignite.Benchmarks;
using BenchmarkDotNet.Running;
-using Table.Serialization;
+using Table;
internal static class Program
{
- private static void Main() =>
BenchmarkRunner.Run<SerializerHandlerReadBenchmarks>();
+ private static void Main() => BenchmarkRunner.Run<DataStreamerBenchmark>();
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
index 7208629dd1..1859f9d27d 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -82,7 +82,7 @@ public class DataStreamerBenchmark
_client = await IgniteClient.StartAsync(cfg);
_table = (await
_client.Tables.GetTableAsync(FakeServer.ExistingTableName))!;
- _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple {
["id"] = x, ["name"] = "name " + x }).ToList();
+ _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple {
["id"] = x }).ToList();
}
[GlobalCleanup]
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 8993c38557..99f13d0e64 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Tests.Table;
using System;
+using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
@@ -335,6 +336,73 @@ public class SchemaSynchronizationTest : IgniteTestsBase
Assert.AreEqual("name1", res.Value);
}
+ [Test]
+ public async Task TestSchemaUpdateWhileStreaming([Values(true, false)]
bool insertNewColumn)
+ {
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName}
(KEY bigint PRIMARY KEY)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ var options = DataStreamerOptions.Default with { BatchSize = 2 };
+ await view.StreamDataAsync(GetData(), options);
+
+ // Inserted with old schema.
+ var res1 = await view.GetAsync(null, GetTuple(1));
+ Assert.AreEqual("FOO", res1.Value["VAL"]);
+
+ // Inserted with new schema.
+ var res2 = await view.GetAsync(null, GetTuple(19));
+ Assert.AreEqual(insertNewColumn ? "BAR_19" : "FOO", res2.Value["VAL"]);
+
+ async IAsyncEnumerable<IIgniteTuple> GetData()
+ {
+ // First set of batches uses old schema.
+ for (int i = 0; i < 10; i++)
+ {
+ yield return GetTuple(i);
+ }
+
+ // Update schema.
+ // New schema has a new column with a default value, so it is not
required to provide it in the streamed data.
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName}
ADD COLUMN VAL varchar DEFAULT 'FOO'");
+ await WaitForNewSchemaOnAllNodes(TestTableName, 2);
+
+ for (int i = 10; i < 20; i++)
+ {
+ yield return insertNewColumn ? GetTuple(i, "BAR_" + i) :
GetTuple(i);
+ }
+ }
+ }
+
+ [Test]
+ public async Task TestSchemaUpdateBeforeStreaming()
+ {
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName}
(KEY bigint PRIMARY KEY)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ // Insert some data - old schema gets cached.
+ await view.InsertAsync(null, GetTuple(-1));
+
+ // Update schema.
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN VAL varchar DEFAULT 'FOO'");
+ await WaitForNewSchemaOnAllNodes(TestTableName, 2);
+
+ // Stream data with new schema. Client does not yet know about the new
schema,
+ // but unmapped column exception will trigger schema reload.
+ await view.StreamDataAsync(new[] { GetTuple(1, "BAR")
}.ToAsyncEnumerable());
+
+ // Inserted with old schema.
+ var res1 = await view.GetAsync(null, GetTuple(-1));
+ Assert.AreEqual("FOO", res1.Value["VAL"]);
+
+ // Inserted with new schema.
+ var res2 = await view.GetAsync(null, GetTuple(1));
+ Assert.AreEqual("BAR", res2.Value["VAL"]);
+ }
+
private async Task WaitForNewSchemaOnAllNodes(string tableName, int
schemaVer, int timeoutMs = 5000)
{
// TODO IGNITE-18733, IGNITE-18449: remove this workaround when issues
are fixed.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 1fd36d60d7..86cf4fd1a6 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -87,15 +87,16 @@ namespace Apache.Ignite.Internal.Buffers
public void Advance(int count)
{
Debug.Assert(count >= 0, "count >= 0");
-
- if (_index > _buffer.Length - count)
- {
- throw new InvalidOperationException("Can't advance past buffer
limit.");
- }
+ Debug.Assert(_index + count < _buffer.Length, "_index + count <
_buffer.Length");
_index += count;
}
+ /// <summary>
+ /// Resets the buffer to the initial state.
+ /// </summary>
+ public void Reset() => _index = _prefixSize;
+
/// <summary>
/// Gets a span for writing.
/// </summary>
@@ -122,24 +123,6 @@ namespace Apache.Ignite.Internal.Buffers
return span;
}
- /// <summary>
- /// Gets a span for writing at the specified position.
- /// </summary>
- /// <param name="position">Position.</param>
- /// <param name="size">Size.</param>
- /// <returns>Span for writing.</returns>
- public Span<byte> GetSpan(int position, int size)
- {
- var overflow = _prefixSize + position + size - _index;
-
- if (overflow > 0)
- {
- CheckAndResizeBuffer(overflow);
- }
-
- return _buffer.AsSpan(_prefixSize + position, size);
- }
-
/// <inheritdoc />
public void Dispose()
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 63a702f992..57f7423a62 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -18,8 +18,11 @@
namespace Apache.Ignite.Internal.Table;
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -62,8 +65,8 @@ internal static class DataStreamer
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<T> data,
Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
- IRecordSerializerHandler<T> writer,
- Func<Task<Schema>> schemaProvider,
+ RecordSerializer<T> writer,
+ Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because
Tasks are cached.
Func<ValueTask<string[]?>> partitionAssignmentProvider,
DataStreamerOptions options,
CancellationToken cancellationToken)
@@ -76,11 +79,10 @@ internal static class DataStreamer
// ConcurrentDictionary is not necessary because we consume the source
sequentially.
// However, locking for batches is required due to auto-flush
background task.
- var batches = new Dictionary<string, Batch>();
+ var batches = new Dictionary<string, Batch<T>>();
var retryPolicy = new RetryLimitPolicy { RetryLimit =
options.RetryLimit };
- // TODO: IGNITE-19710 Data Streamer schema synchronization
- var schema = await schemaProvider().ConfigureAwait(false);
+ var schema = await schemaProvider(null).ConfigureAwait(false);
var partitionAssignment = await
partitionAssignmentProvider().ConfigureAwait(false);
var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
using var flushCts = new CancellationTokenSource();
@@ -95,7 +97,7 @@ internal static class DataStreamer
// However, not all producers support cancellation, so we need
to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
- var (batch, partition) = Add(item);
+ var (batch, partition) = await
AddWithRetryUnmapped(item).ConfigureAwait(false);
if (batch.Count >= options.BatchSize)
{
await SendAsync(batch, partition).ConfigureAwait(false);
@@ -124,19 +126,36 @@ internal static class DataStreamer
foreach (var batch in batches.Values)
{
batch.Buffer.Dispose();
+ ArrayPool<T>.Shared.Return(batch.Items);
Metrics.StreamerItemsQueuedDecrement(batch.Count);
Metrics.StreamerBatchesActiveDecrement();
}
}
- (Batch Batch, string Partition) Add(T item)
+ return;
+
+ async ValueTask<(Batch<T> Batch, string Partition)>
AddWithRetryUnmapped(T item)
+ {
+ try
+ {
+ return Add(item);
+ }
+ catch (Exception e) when (e.CausedByUnmappedColumns())
+ {
+ schema = await
schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+ return Add(item);
+ }
+ }
+
+ (Batch<T> Batch, string Partition) Add(T item)
{
- var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count,
hashedColumnsPredicate: schema);
+ var schema0 = schema;
+ var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Count,
hashedColumnsPredicate: schema0);
try
{
- return Add0(item, ref tupleBuilder);
+ return Add0(item, ref tupleBuilder, schema0);
}
finally
{
@@ -144,29 +163,44 @@ internal static class DataStreamer
}
}
- (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder)
+ (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder, Schema schema0)
{
- var columnCount = schema.Columns.Count;
+ var columnCount = schema0.Columns.Count;
// Use MemoryMarshal to work around [CS8352]: "Cannot use variable
'noValueSet' in this context
// because it may expose referenced variables outside of their
declaration scope".
Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref
MemoryMarshal.GetReference(noValueSet), columnCount);
- writer.Write(ref tupleBuilder, item, schema, columnCount,
noValueSetRef);
+ writer.Handler.Write(ref tupleBuilder, item, schema0, columnCount,
noValueSetRef);
- var partition = partitionAssignment == null
+ // ReSharper disable once AccessToModifiedClosure (reviewed)
+ var partitionAssignment0 = partitionAssignment;
+ var partition = partitionAssignment0 == null
? string.Empty // Default connection.
- : partitionAssignment[Math.Abs(tupleBuilder.GetHash() %
partitionAssignment.Length)];
+ : partitionAssignment0[Math.Abs(tupleBuilder.GetHash() %
partitionAssignment0.Length)];
var batch = GetOrCreateBatch(partition);
lock (batch)
{
- batch.Count++;
+ batch.Items[batch.Count++] = item;
-
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
- batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+ if (batch.Schema != schema0)
+ {
+ batch.SchemaOutdated = true;
+ }
+
+ // 1. To compute target partition, we need key hash.
+ // 2. To compute key hash, we need to serialize the key.
+ // 3. Since we already serialized the key, we can use it for
the message body and avoid re-serialization.
+ // However, if schema gets updated, we need to re-serialize
the whole batch.
+ // Schema update is rare, so we optimize for the happy path.
+ if (!batch.SchemaOutdated)
+ {
+
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
+
batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+ }
}
Metrics.StreamerItemsQueuedIncrement();
@@ -174,13 +208,13 @@ internal static class DataStreamer
return (batch, partition);
}
- Batch GetOrCreateBatch(string partition)
+ Batch<T> GetOrCreateBatch(string partition)
{
ref var batchRef = ref
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
if (batchRef == null)
{
- batchRef = new Batch();
+ batchRef = new Batch<T>(options.BatchSize, schema);
InitBuffer(batchRef);
Metrics.StreamerBatchesActiveIncrement();
@@ -189,7 +223,7 @@ internal static class DataStreamer
return batchRef;
}
- async Task SendAsync(Batch batch, string partition)
+ async Task SendAsync(Batch<T> batch, string partition)
{
var expectedSize = batch.Count;
@@ -210,31 +244,82 @@ internal static class DataStreamer
buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
- batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task, batch.Count);
+ batch.Task = SendAndDisposeBufAsync(buf, partition,
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+ batch.Items = ArrayPool<T>.Shared.Rent(options.BatchSize);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
InitBuffer(batch);
batch.LastFlush = Stopwatch.GetTimestamp();
+ batch.Schema = schema;
+ batch.SchemaOutdated = false;
Metrics.StreamerBatchesActiveIncrement();
}
}
- async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string
partition, Task oldTask, int count)
+ async Task SendAndDisposeBufAsync(
+ PooledArrayBuffer buf,
+ string partition,
+ Task oldTask,
+ T[] items,
+ int count,
+ bool batchSchemaOutdated)
{
- try
+ Debug.Assert(items.Length > 0, "items.Length > 0");
+
+ if (batchSchemaOutdated)
{
- // Wait for the previous batch for this node to preserve item
order.
- await oldTask.ConfigureAwait(false);
- await sender(buf, partition,
retryPolicy).ConfigureAwait(false);
+ // Schema update was detected while the batch was being filled.
+ buf.Reset();
+ writer.WriteMultiple(buf, null, schema, items.Take(count));
+ }
- Metrics.StreamerBatchesSent.Add(1);
- Metrics.StreamerItemsSent.Add(count);
+ try
+ {
+ int? schemaVersion = null;
+ while (true)
+ {
+ try
+ {
+ if (schemaVersion != null)
+ {
+ // Might be updated by another batch.
+ if (schema.Version != schemaVersion)
+ {
+ schema = await
schemaProvider(schemaVersion).ConfigureAwait(false);
+ }
+
+ // Serialize again with the new schema.
+ buf.Reset();
+ writer.WriteMultiple(buf, null, schema,
items.Take(count));
+ }
+
+ // Wait for the previous batch for this node to
preserve item order.
+ await oldTask.ConfigureAwait(false);
+ await sender(buf, partition,
retryPolicy).ConfigureAwait(false);
+
+ Metrics.StreamerBatchesSent.Add(1);
+ Metrics.StreamerItemsSent.Add(count);
+
+ return;
+ }
+ catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch &&
+ schemaVersion !=
e.GetExpectedSchemaVersion())
+ {
+ // Schema update detected after the batch was
serialized.
+ schemaVersion = e.GetExpectedSchemaVersion();
+ }
+ catch (Exception e) when (e.CausedByUnmappedColumns() &&
schemaVersion == null)
+ {
+ schemaVersion = Table.SchemaVersionForceLatest;
+ }
+ }
}
finally
{
buf.Dispose();
+ ArrayPool<T>.Shared.Return(items);
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
@@ -258,7 +343,7 @@ internal static class DataStreamer
}
}
- void InitBuffer(Batch batch)
+ void InitBuffer(Batch<T> batch)
{
var buf = batch.Buffer;
@@ -285,10 +370,23 @@ internal static class DataStreamer
}
}
- private sealed record Batch
+ private sealed record Batch<T>
{
+ public Batch(int capacity, Schema schema)
+ {
+ Items = ArrayPool<T>.Shared.Rent(capacity);
+ Schema = schema;
+ }
+
public PooledArrayBuffer Buffer { get; set; } =
ProtoCommon.GetMessageWriter();
+ [SuppressMessage("Performance", "CA1819:Properties should not return
arrays", Justification = "Private record")]
+ public T[] Items { get; set; }
+
+ public Schema Schema { get; set; }
+
+ public bool SchemaOutdated { get; set; }
+
public int Count { get; set; }
public int CountPos { get; set; }
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f306c6d665..7273c7dbbd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -309,8 +309,8 @@ namespace Apache.Ignite.Internal.Table
retryPolicy)
.ConfigureAwait(false);
},
- writer: _ser.Handler,
- schemaProvider: () => _table.GetLatestSchemaAsync(), // TODO
IGNITE-19710 retry outdated schema.
+ writer: _ser,
+ schemaProvider: _table.GetSchemaAsync,
partitionAssignmentProvider: () =>
_table.GetPartitionAssignmentAsync(),
options ?? DataStreamerOptions.Default,
cancellationToken).ConfigureAwait(false);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
index 987096e4ca..a6af89b283 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// Object serializer handler.
/// </summary>
/// <typeparam name="T">Object type.</typeparam>
- internal class ObjectSerializerHandler<T> : IRecordSerializerHandler<T>
+ internal sealed class ObjectSerializerHandler<T> :
IRecordSerializerHandler<T>
{
private readonly ConcurrentDictionary<(int, int), WriteDelegate<T>>
_writers = new();
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 2b6e94ec6b..f7e95bbd75 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -28,7 +28,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// Works for tuples and user objects, any differences are handled by the
underlying <see cref="IRecordSerializerHandler{T}"/>.
/// </summary>
/// <typeparam name="T">Record type.</typeparam>
- internal class RecordSerializer<T>
+ internal sealed class RecordSerializer<T>
{
/** Table. */
private readonly Table _table;
@@ -196,6 +196,28 @@ namespace Apache.Ignite.Internal.Table.Serialization
return firstHash;
}
+ /// <summary>
+ /// Write multiple records.
+ /// </summary>
+ /// <param name="buf">Buffer.</param>
+ /// <param name="tx">Transaction.</param>
+ /// <param name="schema">Schema.</param>
+ /// <param name="recs">Records.</param>
+ /// <param name="keyOnly">Key only columns.</param>
+ /// <returns>First record hash.</returns>
+ public int WriteMultiple(
+ PooledArrayBuffer buf,
+ Transactions.Transaction? tx,
+ Schema schema,
+ IEnumerable<T> recs,
+ bool keyOnly = false)
+ {
+ var enumerator = recs.GetEnumerator();
+ enumerator.MoveNext();
+
+ return WriteMultiple(buf, tx, schema, enumerator, keyOnly);
+ }
+
/// <summary>
/// Write multiple records.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
index f416746be2..7a4b745757 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
@@ -29,7 +29,7 @@ using Proto.MsgPack;
/// <summary>
/// Serializer handler for <see cref="IIgniteTuple"/>.
/// </summary>
-internal class TuplePairSerializerHandler :
IRecordSerializerHandler<KvPair<IIgniteTuple, IIgniteTuple>>
+internal sealed class TuplePairSerializerHandler :
IRecordSerializerHandler<KvPair<IIgniteTuple, IIgniteTuple>>
{
/// <summary>
/// Singleton instance.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
index b919f39a16..edeccb4c82 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
@@ -29,7 +29,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// <summary>
/// Serializer handler for <see cref="IIgniteTuple"/>.
/// </summary>
- internal class TupleSerializerHandler :
IRecordSerializerHandler<IIgniteTuple>
+ internal sealed class TupleSerializerHandler :
IRecordSerializerHandler<IIgniteTuple>
{
/// <summary>
/// Singleton instance.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 90a15da24c..a6a98fe667 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -182,18 +182,6 @@ namespace Apache.Ignite.Internal.Table
? LoadSchemaAsync(SchemaVersionUnknown)
: GetCachedSchemaAsync(version ?? _latestSchemaVersion);
- /// <summary>
- /// Gets the latest schema.
- /// </summary>
- /// <returns>Schema.</returns>
- internal Task<Schema> GetLatestSchemaAsync()
- {
- // _latestSchemaVersion can be -1 (unknown) or a valid version.
- // In case of unknown version, we request latest from the server
and cache it with -1 key
- // to avoid duplicate requests for latest schema.
- return GetCachedSchemaAsync(_latestSchemaVersion);
- }
-
/// <summary>
/// Gets the preferred node by colocation hash.
/// </summary>