This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ac19cfb95 IGNITE-19710 .NET: Add Data Streamer schema synchronization 
(#2548)
5ac19cfb95 is described below

commit 5ac19cfb9528ec2a72edd1e5a1ff3d03f24b4537
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Sep 5 17:26:14 2023 +0300

    IGNITE-19710 .NET: Add Data Streamer schema synchronization (#2548)
    
    Add support for schema changes in `DataStreamer`:
    * Retry outdated schema error
    * Retry unmapped columns error
    * Keep original rows (tuples or pocos) along with the serialized data, 
re-serialize on retry using new schema
    * Perf: `DataStreamerBenchmark` results not affected (pooled arrays don't 
increase memory usage, and the overhead to fill them is not noticeable)
---
 .../dotnet/Apache.Ignite.Benchmarks/Program.cs     |   4 +-
 .../Table/DataStreamerBenchmark.cs                 |   2 +-
 .../Table/SchemaSynchronizationTest.cs             |  68 +++++++++
 .../Internal/Buffers/PooledArrayBuffer.cs          |  29 +---
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 158 +++++++++++++++++----
 .../Apache.Ignite/Internal/Table/RecordView.cs     |   4 +-
 .../Table/Serialization/ObjectSerializerHandler.cs |   2 +-
 .../Table/Serialization/RecordSerializer.cs        |  24 +++-
 .../Serialization/TuplePairSerializerHandler.cs    |   2 +-
 .../Table/Serialization/TupleSerializerHandler.cs  |   2 +-
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |  12 --
 11 files changed, 233 insertions(+), 74 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
index 366dde3ad2..0c256c1385 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
@@ -18,9 +18,9 @@
 namespace Apache.Ignite.Benchmarks;
 
 using BenchmarkDotNet.Running;
-using Table.Serialization;
+using Table;
 
 internal static class Program
 {
-    private static void Main() => 
BenchmarkRunner.Run<SerializerHandlerReadBenchmarks>();
+    private static void Main() => BenchmarkRunner.Run<DataStreamerBenchmark>();
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
index 7208629dd1..1859f9d27d 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/DataStreamerBenchmark.cs
@@ -82,7 +82,7 @@ public class DataStreamerBenchmark
 
         _client = await IgniteClient.StartAsync(cfg);
         _table = (await 
_client.Tables.GetTableAsync(FakeServer.ExistingTableName))!;
-        _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple { 
["id"] = x, ["name"] = "name " + x }).ToList();
+        _data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple { 
["id"] = x }).ToList();
     }
 
     [GlobalCleanup]
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 8993c38557..99f13d0e64 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Tests.Table;
 
 using System;
+using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading.Tasks;
@@ -335,6 +336,73 @@ public class SchemaSynchronizationTest : IgniteTestsBase
         Assert.AreEqual("name1", res.Value);
     }
 
+    [Test]
+    public async Task TestSchemaUpdateWhileStreaming([Values(true, false)] 
bool insertNewColumn)
+    {
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} 
(KEY bigint PRIMARY KEY)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        var options = DataStreamerOptions.Default with { BatchSize = 2 };
+        await view.StreamDataAsync(GetData(), options);
+
+        // Inserted with old schema.
+        var res1 = await view.GetAsync(null, GetTuple(1));
+        Assert.AreEqual("FOO", res1.Value["VAL"]);
+
+        // Inserted with new schema.
+        var res2 = await view.GetAsync(null, GetTuple(19));
+        Assert.AreEqual(insertNewColumn ? "BAR_19" : "FOO", res2.Value["VAL"]);
+
+        async IAsyncEnumerable<IIgniteTuple> GetData()
+        {
+            // First set of batches uses old schema.
+            for (int i = 0; i < 10; i++)
+            {
+                yield return GetTuple(i);
+            }
+
+            // Update schema.
+            // New schema has a new column with a default value, so it is not 
required to provide it in the streamed data.
+            await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} 
ADD COLUMN VAL varchar DEFAULT 'FOO'");
+            await WaitForNewSchemaOnAllNodes(TestTableName, 2);
+
+            for (int i = 10; i < 20; i++)
+            {
+                yield return insertNewColumn ? GetTuple(i, "BAR_" + i) : 
GetTuple(i);
+            }
+        }
+    }
+
+    [Test]
+    public async Task TestSchemaUpdateBeforeStreaming()
+    {
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} 
(KEY bigint PRIMARY KEY)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        // Insert some data - old schema gets cached.
+        await view.InsertAsync(null, GetTuple(-1));
+
+        // Update schema.
+        await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD 
COLUMN VAL varchar DEFAULT 'FOO'");
+        await WaitForNewSchemaOnAllNodes(TestTableName, 2);
+
+        // Stream data with new schema. Client does not yet know about the new 
schema,
+        // but unmapped column exception will trigger schema reload.
+        await view.StreamDataAsync(new[] { GetTuple(1, "BAR") 
}.ToAsyncEnumerable());
+
+        // Inserted with old schema.
+        var res1 = await view.GetAsync(null, GetTuple(-1));
+        Assert.AreEqual("FOO", res1.Value["VAL"]);
+
+        // Inserted with new schema.
+        var res2 = await view.GetAsync(null, GetTuple(1));
+        Assert.AreEqual("BAR", res2.Value["VAL"]);
+    }
+
     private async Task WaitForNewSchemaOnAllNodes(string tableName, int 
schemaVer, int timeoutMs = 5000)
     {
         // TODO IGNITE-18733, IGNITE-18449: remove this workaround when issues 
are fixed.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 1fd36d60d7..86cf4fd1a6 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -87,15 +87,16 @@ namespace Apache.Ignite.Internal.Buffers
         public void Advance(int count)
         {
             Debug.Assert(count >= 0, "count >= 0");
-
-            if (_index > _buffer.Length - count)
-            {
-                throw new InvalidOperationException("Can't advance past buffer 
limit.");
-            }
+            Debug.Assert(_index + count < _buffer.Length, "_index + count < 
_buffer.Length");
 
             _index += count;
         }
 
+        /// <summary>
+        /// Resets the buffer to the initial state.
+        /// </summary>
+        public void Reset() => _index = _prefixSize;
+
         /// <summary>
         /// Gets a span for writing.
         /// </summary>
@@ -122,24 +123,6 @@ namespace Apache.Ignite.Internal.Buffers
             return span;
         }
 
-        /// <summary>
-        /// Gets a span for writing at the specified position.
-        /// </summary>
-        /// <param name="position">Position.</param>
-        /// <param name="size">Size.</param>
-        /// <returns>Span for writing.</returns>
-        public Span<byte> GetSpan(int position, int size)
-        {
-            var overflow = _prefixSize + position + size - _index;
-
-            if (overflow > 0)
-            {
-                CheckAndResizeBuffer(overflow);
-            }
-
-            return _buffer.AsSpan(_prefixSize + position, size);
-        }
-
         /// <inheritdoc />
         public void Dispose()
         {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 63a702f992..57f7423a62 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -18,8 +18,11 @@
 namespace Apache.Ignite.Internal.Table;
 
 using System;
+using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -62,8 +65,8 @@ internal static class DataStreamer
     internal static async Task StreamDataAsync<T>(
         IAsyncEnumerable<T> data,
         Func<PooledArrayBuffer, string, IRetryPolicy, Task> sender,
-        IRecordSerializerHandler<T> writer,
-        Func<Task<Schema>> schemaProvider,
+        RecordSerializer<T> writer,
+        Func<int?, Task<Schema>> schemaProvider, // Not a ValueTask because 
Tasks are cached.
         Func<ValueTask<string[]?>> partitionAssignmentProvider,
         DataStreamerOptions options,
         CancellationToken cancellationToken)
@@ -76,11 +79,10 @@ internal static class DataStreamer
 
         // ConcurrentDictionary is not necessary because we consume the source 
sequentially.
         // However, locking for batches is required due to auto-flush 
background task.
-        var batches = new Dictionary<string, Batch>();
+        var batches = new Dictionary<string, Batch<T>>();
         var retryPolicy = new RetryLimitPolicy { RetryLimit = 
options.RetryLimit };
 
-        // TODO: IGNITE-19710 Data Streamer schema synchronization
-        var schema = await schemaProvider().ConfigureAwait(false);
+        var schema = await schemaProvider(null).ConfigureAwait(false);
         var partitionAssignment = await 
partitionAssignmentProvider().ConfigureAwait(false);
         var lastPartitionsAssignmentCheck = Stopwatch.StartNew();
         using var flushCts = new CancellationTokenSource();
@@ -95,7 +97,7 @@ internal static class DataStreamer
                 // However, not all producers support cancellation, so we need 
to check it here as well.
                 cancellationToken.ThrowIfCancellationRequested();
 
-                var (batch, partition) = Add(item);
+                var (batch, partition) = await 
AddWithRetryUnmapped(item).ConfigureAwait(false);
                 if (batch.Count >= options.BatchSize)
                 {
                     await SendAsync(batch, partition).ConfigureAwait(false);
@@ -124,19 +126,36 @@ internal static class DataStreamer
             foreach (var batch in batches.Values)
             {
                 batch.Buffer.Dispose();
+                ArrayPool<T>.Shared.Return(batch.Items);
 
                 Metrics.StreamerItemsQueuedDecrement(batch.Count);
                 Metrics.StreamerBatchesActiveDecrement();
             }
         }
 
-        (Batch Batch, string Partition) Add(T item)
+        return;
+
+        async ValueTask<(Batch<T> Batch, string Partition)> 
AddWithRetryUnmapped(T item)
+        {
+            try
+            {
+                return Add(item);
+            }
+            catch (Exception e) when (e.CausedByUnmappedColumns())
+            {
+                schema = await 
schemaProvider(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+                return Add(item);
+            }
+        }
+
+        (Batch<T> Batch, string Partition) Add(T item)
         {
-            var tupleBuilder = new BinaryTupleBuilder(schema.Columns.Count, 
hashedColumnsPredicate: schema);
+            var schema0 = schema;
+            var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Count, 
hashedColumnsPredicate: schema0);
 
             try
             {
-                return Add0(item, ref tupleBuilder);
+                return Add0(item, ref tupleBuilder, schema0);
             }
             finally
             {
@@ -144,29 +163,44 @@ internal static class DataStreamer
             }
         }
 
-        (Batch Batch, string Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder)
+        (Batch<T> Batch, string Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder, Schema schema0)
         {
-            var columnCount = schema.Columns.Count;
+            var columnCount = schema0.Columns.Count;
 
             // Use MemoryMarshal to work around [CS8352]: "Cannot use variable 
'noValueSet' in this context
             // because it may expose referenced variables outside of their 
declaration scope".
             Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
             Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref 
MemoryMarshal.GetReference(noValueSet), columnCount);
 
-            writer.Write(ref tupleBuilder, item, schema, columnCount, 
noValueSetRef);
+            writer.Handler.Write(ref tupleBuilder, item, schema0, columnCount, 
noValueSetRef);
 
-            var partition = partitionAssignment == null
+            // ReSharper disable once AccessToModifiedClosure (reviewed)
+            var partitionAssignment0 = partitionAssignment;
+            var partition = partitionAssignment0 == null
                 ? string.Empty // Default connection.
-                : partitionAssignment[Math.Abs(tupleBuilder.GetHash() % 
partitionAssignment.Length)];
+                : partitionAssignment0[Math.Abs(tupleBuilder.GetHash() % 
partitionAssignment0.Length)];
 
             var batch = GetOrCreateBatch(partition);
 
             lock (batch)
             {
-                batch.Count++;
+                batch.Items[batch.Count++] = item;
 
-                
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
-                batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+                if (batch.Schema != schema0)
+                {
+                    batch.SchemaOutdated = true;
+                }
+
+                // 1. To compute target partition, we need key hash.
+                // 2. To compute key hash, we need to serialize the key.
+                // 3. Since we already serialized the key, we can use it for 
the message body and avoid re-serialization.
+                // However, if schema gets updated, we need to re-serialize 
the whole batch.
+                // Schema update is rare, so we optimize for the happy path.
+                if (!batch.SchemaOutdated)
+                {
+                    
noValueSet.CopyTo(batch.Buffer.MessageWriter.WriteBitSet(columnCount));
+                    
batch.Buffer.MessageWriter.Write(tupleBuilder.Build().Span);
+                }
             }
 
             Metrics.StreamerItemsQueuedIncrement();
@@ -174,13 +208,13 @@ internal static class DataStreamer
             return (batch, partition);
         }
 
-        Batch GetOrCreateBatch(string partition)
+        Batch<T> GetOrCreateBatch(string partition)
         {
             ref var batchRef = ref 
CollectionsMarshal.GetValueRefOrAddDefault(batches, partition, out _);
 
             if (batchRef == null)
             {
-                batchRef = new Batch();
+                batchRef = new Batch<T>(options.BatchSize, schema);
                 InitBuffer(batchRef);
 
                 Metrics.StreamerBatchesActiveIncrement();
@@ -189,7 +223,7 @@ internal static class DataStreamer
             return batchRef;
         }
 
-        async Task SendAsync(Batch batch, string partition)
+        async Task SendAsync(Batch<T> batch, string partition)
         {
             var expectedSize = batch.Count;
 
@@ -210,31 +244,82 @@ internal static class DataStreamer
                 buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
                 buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
 
-                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task, batch.Count);
+                batch.Task = SendAndDisposeBufAsync(buf, partition, 
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
 
+                batch.Items = ArrayPool<T>.Shared.Rent(options.BatchSize);
                 batch.Count = 0;
                 batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
                 InitBuffer(batch);
                 batch.LastFlush = Stopwatch.GetTimestamp();
+                batch.Schema = schema;
+                batch.SchemaOutdated = false;
 
                 Metrics.StreamerBatchesActiveIncrement();
             }
         }
 
-        async Task SendAndDisposeBufAsync(PooledArrayBuffer buf, string 
partition, Task oldTask, int count)
+        async Task SendAndDisposeBufAsync(
+            PooledArrayBuffer buf,
+            string partition,
+            Task oldTask,
+            T[] items,
+            int count,
+            bool batchSchemaOutdated)
         {
-            try
+            Debug.Assert(items.Length > 0, "items.Length > 0");
+
+            if (batchSchemaOutdated)
             {
-                // Wait for the previous batch for this node to preserve item 
order.
-                await oldTask.ConfigureAwait(false);
-                await sender(buf, partition, 
retryPolicy).ConfigureAwait(false);
+                // Schema update was detected while the batch was being filled.
+                buf.Reset();
+                writer.WriteMultiple(buf, null, schema, items.Take(count));
+            }
 
-                Metrics.StreamerBatchesSent.Add(1);
-                Metrics.StreamerItemsSent.Add(count);
+            try
+            {
+                int? schemaVersion = null;
+                while (true)
+                {
+                    try
+                    {
+                        if (schemaVersion != null)
+                        {
+                            // Might be updated by another batch.
+                            if (schema.Version != schemaVersion)
+                            {
+                                schema = await 
schemaProvider(schemaVersion).ConfigureAwait(false);
+                            }
+
+                            // Serialize again with the new schema.
+                            buf.Reset();
+                            writer.WriteMultiple(buf, null, schema, 
items.Take(count));
+                        }
+
+                        // Wait for the previous batch for this node to 
preserve item order.
+                        await oldTask.ConfigureAwait(false);
+                        await sender(buf, partition, 
retryPolicy).ConfigureAwait(false);
+
+                        Metrics.StreamerBatchesSent.Add(1);
+                        Metrics.StreamerItemsSent.Add(count);
+
+                        return;
+                    }
+                    catch (IgniteException e) when (e.Code == 
ErrorGroups.Table.SchemaVersionMismatch &&
+                                                    schemaVersion != 
e.GetExpectedSchemaVersion())
+                    {
+                        // Schema update detected after the batch was 
serialized.
+                        schemaVersion = e.GetExpectedSchemaVersion();
+                    }
+                    catch (Exception e) when (e.CausedByUnmappedColumns() && 
schemaVersion == null)
+                    {
+                        schemaVersion = Table.SchemaVersionForceLatest;
+                    }
+                }
             }
             finally
             {
                 buf.Dispose();
+                ArrayPool<T>.Shared.Return(items);
 
                 Metrics.StreamerItemsQueuedDecrement(count);
                 Metrics.StreamerBatchesActiveDecrement();
@@ -258,7 +343,7 @@ internal static class DataStreamer
             }
         }
 
-        void InitBuffer(Batch batch)
+        void InitBuffer(Batch<T> batch)
         {
             var buf = batch.Buffer;
 
@@ -285,10 +370,23 @@ internal static class DataStreamer
         }
     }
 
-    private sealed record Batch
+    private sealed record Batch<T>
     {
+        public Batch(int capacity, Schema schema)
+        {
+            Items = ArrayPool<T>.Shared.Rent(capacity);
+            Schema = schema;
+        }
+
         public PooledArrayBuffer Buffer { get; set; } = 
ProtoCommon.GetMessageWriter();
 
+        [SuppressMessage("Performance", "CA1819:Properties should not return 
arrays", Justification = "Private record")]
+        public T[] Items { get; set; }
+
+        public Schema Schema { get; set; }
+
+        public bool SchemaOutdated { get; set; }
+
         public int Count { get; set; }
 
         public int CountPos { get; set; }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f306c6d665..7273c7dbbd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -309,8 +309,8 @@ namespace Apache.Ignite.Internal.Table
                             retryPolicy)
                         .ConfigureAwait(false);
                 },
-                writer: _ser.Handler,
-                schemaProvider: () => _table.GetLatestSchemaAsync(), // TODO 
IGNITE-19710 retry outdated schema.
+                writer: _ser,
+                schemaProvider: _table.GetSchemaAsync,
                 partitionAssignmentProvider: () => 
_table.GetPartitionAssignmentAsync(),
                 options ?? DataStreamerOptions.Default,
                 cancellationToken).ConfigureAwait(false);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
index 987096e4ca..a6af89b283 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/ObjectSerializerHandler.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
     /// Object serializer handler.
     /// </summary>
     /// <typeparam name="T">Object type.</typeparam>
-    internal class ObjectSerializerHandler<T> : IRecordSerializerHandler<T>
+    internal sealed class ObjectSerializerHandler<T> : 
IRecordSerializerHandler<T>
     {
         private readonly ConcurrentDictionary<(int, int), WriteDelegate<T>> 
_writers = new();
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 2b6e94ec6b..f7e95bbd75 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -28,7 +28,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
     /// Works for tuples and user objects, any differences are handled by the 
underlying <see cref="IRecordSerializerHandler{T}"/>.
     /// </summary>
     /// <typeparam name="T">Record type.</typeparam>
-    internal class RecordSerializer<T>
+    internal sealed class RecordSerializer<T>
     {
         /** Table. */
         private readonly Table _table;
@@ -196,6 +196,28 @@ namespace Apache.Ignite.Internal.Table.Serialization
             return firstHash;
         }
 
+        /// <summary>
+        /// Write multiple records.
+        /// </summary>
+        /// <param name="buf">Buffer.</param>
+        /// <param name="tx">Transaction.</param>
+        /// <param name="schema">Schema.</param>
+        /// <param name="recs">Records.</param>
+        /// <param name="keyOnly">Key only columns.</param>
+        /// <returns>First record hash.</returns>
+        public int WriteMultiple(
+            PooledArrayBuffer buf,
+            Transactions.Transaction? tx,
+            Schema schema,
+            IEnumerable<T> recs,
+            bool keyOnly = false)
+        {
+            var enumerator = recs.GetEnumerator();
+            enumerator.MoveNext();
+
+            return WriteMultiple(buf, tx, schema, enumerator, keyOnly);
+        }
+
         /// <summary>
         /// Write multiple records.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
index f416746be2..7a4b745757 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TuplePairSerializerHandler.cs
@@ -29,7 +29,7 @@ using Proto.MsgPack;
 /// <summary>
 /// Serializer handler for <see cref="IIgniteTuple"/>.
 /// </summary>
-internal class TuplePairSerializerHandler : 
IRecordSerializerHandler<KvPair<IIgniteTuple, IIgniteTuple>>
+internal sealed class TuplePairSerializerHandler : 
IRecordSerializerHandler<KvPair<IIgniteTuple, IIgniteTuple>>
 {
     /// <summary>
     /// Singleton instance.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
index b919f39a16..edeccb4c82 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleSerializerHandler.cs
@@ -29,7 +29,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
     /// <summary>
     /// Serializer handler for <see cref="IIgniteTuple"/>.
     /// </summary>
-    internal class TupleSerializerHandler : 
IRecordSerializerHandler<IIgniteTuple>
+    internal sealed class TupleSerializerHandler : 
IRecordSerializerHandler<IIgniteTuple>
     {
         /// <summary>
         /// Singleton instance.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 90a15da24c..a6a98fe667 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -182,18 +182,6 @@ namespace Apache.Ignite.Internal.Table
             ? LoadSchemaAsync(SchemaVersionUnknown)
             : GetCachedSchemaAsync(version ?? _latestSchemaVersion);
 
-        /// <summary>
-        /// Gets the latest schema.
-        /// </summary>
-        /// <returns>Schema.</returns>
-        internal Task<Schema> GetLatestSchemaAsync()
-        {
-            // _latestSchemaVersion can be -1 (unknown) or a valid version.
-            // In case of unknown version, we request latest from the server 
and cache it with -1 key
-            // to avoid duplicate requests for latest schema.
-            return GetCachedSchemaAsync(_latestSchemaVersion);
-        }
-
         /// <summary>
         /// Gets the preferred node by colocation hash.
         /// </summary>

Reply via email to