This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c1a6a07307 IGNITE-19838 .NET: Retry outdated schema error  (#2434)
c1a6a07307 is described below

commit c1a6a073071fecb85ed6aaa96bff2d0948634ef3
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Aug 10 15:20:28 2023 +0300

    IGNITE-19838 .NET: Retry outdated schema error  (#2434)
    
    * Read known error extensions data from server
    * Retry `SchemaVersionMismatch` error code with specified schema
    * Improve code reuse in `RecordView` (there are still multiple methods 
which do the same retry logic; we could fix it with delegates `DoOp(() -> 
blabla)`, but this will increase allocations)
---
 .../Table/SchemaSynchronizationTest.cs             | 196 +++++++++++++++++++++
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |  18 +-
 .../Internal/Common/ExceptionExtensions.cs         |  52 ++++++
 .../Apache.Ignite/Internal/Compute/Compute.cs      |   8 +-
 .../Internal/Proto/ErrorExtensions.cs              |  29 +++
 .../Apache.Ignite/Internal/Table/RecordView.cs     | 161 +++++++++--------
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |   7 +
 .../ItThinClientSchemaSynchronizationTest.java     |   2 +-
 8 files changed, 398 insertions(+), 75 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
new file mode 100644
index 0000000000..6660876f52
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Threading.Tasks;
+using Compute;
+using Ignite.Compute;
+using Ignite.Table;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for client table schema synchronization.
+/// </summary>
+public class SchemaSynchronizationTest : IgniteTestsBase
+{
+    private static readonly TestMode[] TestModes = Enum.GetValues<TestMode>();
+
+    private static readonly TestMode[] ReadTestModes = { TestMode.One, 
TestMode.Multiple };
+
+    public enum TestMode
+    {
+        One,
+        Two,
+        Multiple,
+        Compute
+    }
+
+    private static string TestTableName => TestContext.CurrentContext.Test.Name
+        .Replace("(", string.Empty)
+        .Replace(")", string.Empty);
+
+    [TearDown]
+    public async Task DeleteTable() => await Client.Sql.ExecuteAsync(null, 
$"DROP TABLE {TestTableName}");
+
+    [Test]
+    public async Task 
TestClientUsesLatestSchemaOnWrite([ValueSource(nameof(TestModes))] TestMode 
testMode)
+    {
+        // Create table, insert data.
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID 
INT NOT NULL PRIMARY KEY, NAME VARCHAR NOT NULL)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        var rec = new IgniteTuple
+        {
+            ["ID"] = 1,
+            ["NAME"] = "name"
+        };
+
+        await view.InsertAsync(null, rec);
+
+        // Modify table, insert data - client will use old schema, receive 
error, retry with new schema.
+        // The process is transparent for the user: updated schema is in 
effect immediately.
+        await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} DROP 
COLUMN NAME");
+
+        var rec2 = new IgniteTuple
+        {
+            ["ID"] = 2,
+            ["NAME"] = "name2"
+        };
+
+        // TODO this should fail when we implement IGNITE-19836 Reject Tuples 
and POCOs with unmapped fields
+        switch (testMode)
+        {
+            case TestMode.One:
+                await view.InsertAsync(null, rec2);
+                break;
+
+            case TestMode.Two:
+                await view.ReplaceAsync(null, rec2, rec2);
+                break;
+
+            case TestMode.Multiple:
+                await view.InsertAllAsync(null, new[] { rec2, rec2, rec2 });
+                break;
+
+            case TestMode.Compute:
+                await Client.Compute.ExecuteColocatedAsync<string>(
+                    table.Name, rec2, Array.Empty<DeploymentUnit>(), 
ComputeTests.NodeNameJob);
+                break;
+
+            default:
+                Assert.Fail("Invalid test mode: " + testMode);
+                break;
+        }
+    }
+
+    [Test]
+    public async Task 
TestClientUsesLatestSchemaOnRead([ValueSource(nameof(ReadTestModes))] TestMode 
testMode)
+    {
+        // Create table, insert data.
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID 
INT NOT NULL PRIMARY KEY)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        var rec = new IgniteTuple { ["ID"] = 1 };
+        await view.InsertAsync(null, rec);
+
+        // Modify table, insert data - client will use old schema, receive 
error, retry with new schema.
+        // The process is transparent for the user: updated schema is in 
effect immediately.
+        await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD 
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+        switch (testMode)
+        {
+            case TestMode.One:
+            {
+                var res = await view.GetAsync(null, rec);
+
+                Assert.IsTrue(res.HasValue);
+                Assert.AreEqual("name1", res.Value["NAME"]);
+                break;
+            }
+
+            case TestMode.Multiple:
+            {
+                var res = await view.GetAllAsync(null, new[] { rec, rec });
+
+                Assert.AreEqual(2, res.Count);
+
+                foreach (var r in res)
+                {
+                    Assert.IsTrue(r.HasValue);
+                    Assert.AreEqual("name1", r.Value["NAME"]);
+                }
+
+                break;
+            }
+
+            default:
+                Assert.Fail("Invalid test mode: " + testMode);
+                break;
+        }
+    }
+
+    [Test]
+    public async Task TestClientUsesLatestSchemaOnReadPoco()
+    {
+        // Create table, insert data.
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID 
INT NOT NULL PRIMARY KEY)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        var rec = new IgniteTuple { ["ID"] = 1 };
+        await view.InsertAsync(null, rec);
+
+        await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD 
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+        var pocoView = table.GetRecordView<Poco>();
+        var res = await pocoView.GetAsync(null, new Poco(1, string.Empty));
+
+        Assert.IsTrue(res.HasValue);
+        Assert.AreEqual(1, res.Value.Id);
+        Assert.AreEqual("name1", res.Value.Name);
+    }
+
+    [Test]
+    public async Task TestClientUsesLatestSchemaOnReadKv()
+    {
+        // Create table, insert data.
+        await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID 
INT NOT NULL PRIMARY KEY)");
+
+        var table = await Client.Tables.GetTableAsync(TestTableName);
+        var view = table!.RecordBinaryView;
+
+        var rec = new IgniteTuple { ["ID"] = 1 };
+        await view.InsertAsync(null, rec);
+
+        await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD 
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+        var pocoView = table.GetKeyValueView<int, string>();
+        var res = await pocoView.GetAsync(null, 1);
+
+        Assert.IsTrue(res.HasValue);
+        Assert.AreEqual("name1", res.Value);
+    }
+
+    private record Poco(int Id, string Name);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 2c5039eb8c..fb26a2eb0a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -399,11 +399,23 @@ namespace Apache.Ignite.Internal
             string className = reader.ReadString();
             string? message = reader.ReadStringNullable();
             string? javaStackTrace = reader.ReadStringNullable();
+            var ex = ExceptionMapper.GetException(traceId, code, className, 
message, javaStackTrace);
 
-            // TODO IGNITE-19838 Retry outdated schema error
-            reader.Skip(); // Error extensions.
+            int extensionCount = reader.TryReadNil() ? 0 : 
reader.ReadMapHeader();
+            for (int i = 0; i < extensionCount; i++)
+            {
+                var key = reader.ReadString();
+                if (key == ErrorExtensions.ExpectedSchemaVersion)
+                {
+                    ex.Data[key] = reader.ReadInt32();
+                }
+                else
+                {
+                    reader.Skip(); // Unknown extension - ignore.
+                }
+            }
 
-            return ExceptionMapper.GetException(traceId, code, className, 
message, javaStackTrace);
+            return ex;
         }
 
         private static async ValueTask<PooledBuffer> ReadResponseAsync(
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs
new file mode 100644
index 0000000000..8243e3af06
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Common;
+
+using System;
+using System.Diagnostics;
+using Proto;
+
+/// <summary>
+/// Exception extensions.
+/// </summary>
+internal static class ExceptionExtensions
+{
+    /// <summary>
+    /// Gets expected schema version from the error data.
+    /// </summary>
+    /// <param name="e">Exception.</param>
+    /// <returns>Expected schema version.</returns>
+    /// <exception cref="IgniteException">When specified exception does not 
have schema version in <see cref="Exception.Data"/>.</exception>
+    public static int GetExpectedSchemaVersion(this IgniteException e)
+    {
+        Debug.Assert(
+            e.Code == ErrorGroups.Table.SchemaVersionMismatch,
+            $"e.Code == ErrorGroups.Table.SchemaVersionMismatch: expected 
{ErrorGroups.Table.SchemaVersionMismatch}, actual {e.Code}");
+
+        if (e.Data[ErrorExtensions.ExpectedSchemaVersion] is int schemaVer)
+        {
+            return schemaVer;
+        }
+
+        throw new IgniteException(
+            e.TraceId,
+            ErrorGroups.Client.Protocol,
+            "Expected schema version is not specified in error extension map.",
+            e);
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 0aec31777e..7012b3beab 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -257,11 +257,12 @@ namespace Apache.Ignite.Internal.Compute
             IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
 
             var units0 = units as ICollection<DeploymentUnit> ?? 
units.ToList(); // Avoid multiple enumeration.
+            int? schemaVersion = null;
 
             while (true)
             {
                 var table = await 
GetTableAsync(tableName).ConfigureAwait(false);
-                var schema = await 
table.GetLatestSchemaAsync().ConfigureAwait(false);
+                var schema = await 
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
 
                 using var bufferWriter = ProtoCommon.GetMessageWriter();
                 var colocationHash = Write(bufferWriter, table, schema);
@@ -279,6 +280,11 @@ namespace Apache.Ignite.Internal.Compute
                     // Table was dropped - remove from cache.
                     // Try again in case a new table with the same name exists.
                     _tableCache.TryRemove(tableName, out _);
+                    schemaVersion = null;
+                }
+                catch (IgniteException e) when (e.Code == 
ErrorGroups.Table.SchemaVersionMismatch)
+                {
+                    schemaVersion = e.GetExpectedSchemaVersion();
                 }
             }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs
new file mode 100644
index 0000000000..3fb888ff00
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto;
+
+/// <summary>
+/// Error data extensions. When the server returns an error response, it may 
contain additional data in a map. Keys are defined here.
+/// </summary>
+internal static class ErrorExtensions
+{
+    /// <summary>
+    /// Expected schema version for <see 
cref="ErrorGroups.Table.SchemaVersionMismatch"/> error.
+    /// </summary>
+    public const string ExpectedSchemaVersion = "expected-schema-ver";
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index b03b61c441..4d2a318462 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -147,25 +147,16 @@ namespace Apache.Ignite.Internal.Table
         {
             IgniteArgumentCheck.NotNull(keys, nameof(keys));
 
-            using var iterator = keys.GetEnumerator();
-
-            if (!iterator.MoveNext())
+            using var resBuf = await 
DoMultiRecordOutOpAsync(ClientOp.TupleGetAll, transaction, keys, 
true).ConfigureAwait(false);
+            if (resBuf == null)
             {
                 return resultFactory(0);
             }
 
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
-
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.WriteMultiple(writer, tx, schema, 
iterator, keyOnly: true);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
-
-            using var resBuf = await DoOutInOpAsync(ClientOp.TupleGetAll, tx, 
writer, preferredNode).ConfigureAwait(false);
-            var resSchema = await 
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+            var resSchema = await 
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
 
             // TODO: Read value parts only (IGNITE-16022).
-            return _ser.ReadMultipleNullable(resBuf, resSchema, resultFactory, 
addAction);
+            return _ser.ReadMultipleNullable(resBuf.Value, resSchema, 
resultFactory, addAction);
         }
 
         /// <inheritdoc/>
@@ -181,21 +172,7 @@ namespace Apache.Ignite.Internal.Table
         {
             IgniteArgumentCheck.NotNull(records, nameof(records));
 
-            using var iterator = records.GetEnumerator();
-
-            if (!iterator.MoveNext())
-            {
-                return;
-            }
-
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
-
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.WriteMultiple(writer, tx, schema, 
iterator);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
-
-            using var resBuf = await DoOutInOpAsync(ClientOp.TupleUpsertAll, 
tx, writer, preferredNode).ConfigureAwait(false);
+            using var resBuf = await 
DoMultiRecordOutOpAsync(ClientOp.TupleUpsertAll, transaction, 
records).ConfigureAwait(false);
         }
 
         /// <inheritdoc/>
@@ -223,26 +200,17 @@ namespace Apache.Ignite.Internal.Table
         {
             IgniteArgumentCheck.NotNull(records, nameof(records));
 
-            using var iterator = records.GetEnumerator();
-
-            if (!iterator.MoveNext())
+            using var resBuf = await 
DoMultiRecordOutOpAsync(ClientOp.TupleInsertAll, transaction, 
records).ConfigureAwait(false);
+            if (resBuf == null)
             {
                 return Array.Empty<T>();
             }
 
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
-
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.WriteMultiple(writer, tx, schema, 
iterator);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
-
-            using var resBuf = await DoOutInOpAsync(ClientOp.TupleInsertAll, 
tx, writer, preferredNode).ConfigureAwait(false);
-            var resSchema = await 
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+            var resSchema = await 
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
 
             // TODO: Read value parts only (IGNITE-16022).
             return _ser.ReadMultiple(
-                buf: resBuf,
+                buf: resBuf.Value,
                 schema: resSchema,
                 keyOnly: false,
                 resultFactory: static count => count == 0
@@ -265,14 +233,9 @@ namespace Apache.Ignite.Internal.Table
         {
             IgniteArgumentCheck.NotNull(record, nameof(record));
 
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
-
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.WriteTwo(writer, tx, schema, record, 
newRecord);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
+            using var resBuf = await 
DoTwoRecordOutOpAsync(ClientOp.TupleReplaceExact, transaction, record, 
newRecord)
+                .ConfigureAwait(false);
 
-            using var resBuf = await 
DoOutInOpAsync(ClientOp.TupleReplaceExact, tx, writer, 
preferredNode).ConfigureAwait(false);
             return ReadSchemaAndBoolean(resBuf);
         }
 
@@ -342,7 +305,7 @@ namespace Apache.Ignite.Internal.Table
                         .ConfigureAwait(false);
                 },
                 writer: _ser.Handler,
-                schemaProvider: () => _table.GetLatestSchemaAsync(),
+                schemaProvider: () => _table.GetLatestSchemaAsync(), // TODO 
IGNITE-19710 retry outdated schema.
                 partitionAssignmentProvider: () => 
_table.GetPartitionAssignmentAsync(),
                 options ?? DataStreamerOptions.Default,
                 cancellationToken).ConfigureAwait(false);
@@ -395,27 +358,18 @@ namespace Apache.Ignite.Internal.Table
         {
             IgniteArgumentCheck.NotNull(records, nameof(records));
 
-            using var iterator = records.GetEnumerator();
-
-            if (!iterator.MoveNext())
+            var clientOp = exact ? ClientOp.TupleDeleteAllExact : 
ClientOp.TupleDeleteAll;
+            using var resBuf = await DoMultiRecordOutOpAsync(clientOp, 
transaction, records, keyOnly: !exact).ConfigureAwait(false);
+            if (resBuf == null)
             {
                 return resultFactory(0);
             }
 
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
-
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.WriteMultiple(writer, tx, schema, 
iterator, keyOnly: !exact);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
-
-            var clientOp = exact ? ClientOp.TupleDeleteAllExact : 
ClientOp.TupleDeleteAll;
-            using var resBuf = await DoOutInOpAsync(clientOp, tx, writer, 
preferredNode).ConfigureAwait(false);
-            var resSchema = await 
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+            var resSchema = await 
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
 
             // TODO: Read value parts only (IGNITE-16022).
             return _ser.ReadMultiple(
-                buf: resBuf,
+                buf: resBuf.Value,
                 schema: resSchema,
                 keyOnly: !exact,
                 resultFactory: resultFactory,
@@ -448,16 +402,83 @@ namespace Apache.Ignite.Internal.Table
             ClientOp op,
             ITransaction? transaction,
             T record,
-            bool keyOnly = false)
+            bool keyOnly = false,
+            int? schemaVersionOverride = null)
+        {
+            try
+            {
+                var schema = await 
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+                var tx = transaction.ToInternal();
+
+                using var writer = ProtoCommon.GetMessageWriter();
+                var colocationHash = _ser.Write(writer, tx, schema, record, 
keyOnly);
+                var preferredNode = await 
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+                return await DoOutInOpAsync(op, tx, writer, 
preferredNode).ConfigureAwait(false);
+            }
+            catch (IgniteException e) when (e.Code == 
ErrorGroups.Table.SchemaVersionMismatch)
+            {
+                return await DoRecordOutOpAsync(op, transaction, record, 
keyOnly, e.GetExpectedSchemaVersion()).ConfigureAwait(false);
+            }
+        }
+
+        private async Task<PooledBuffer> DoTwoRecordOutOpAsync(
+            ClientOp op,
+            ITransaction? transaction,
+            T record,
+            T record2,
+            bool keyOnly = false,
+            int? schemaVersionOverride = null)
         {
-            var schema = await 
_table.GetLatestSchemaAsync().ConfigureAwait(false);
-            var tx = transaction.ToInternal();
+            try
+            {
+                var schema = await 
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+                var tx = transaction.ToInternal();
 
-            using var writer = ProtoCommon.GetMessageWriter();
-            var colocationHash = _ser.Write(writer, tx, schema, record, 
keyOnly);
-            var preferredNode = await _table.GetPreferredNode(colocationHash, 
transaction).ConfigureAwait(false);
+                using var writer = ProtoCommon.GetMessageWriter();
+                var colocationHash = _ser.WriteTwo(writer, tx, schema, record, 
record2, keyOnly);
+                var preferredNode = await 
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+                return await DoOutInOpAsync(op, tx, writer, 
preferredNode).ConfigureAwait(false);
+            }
+            catch (IgniteException e) when (e.Code == 
ErrorGroups.Table.SchemaVersionMismatch)
+            {
+                return await DoTwoRecordOutOpAsync(op, transaction, record, 
record2, keyOnly, e.GetExpectedSchemaVersion())
+                    .ConfigureAwait(false);
+            }
+        }
 
-            return await DoOutInOpAsync(op, tx, writer, 
preferredNode).ConfigureAwait(false);
+        private async Task<PooledBuffer?> DoMultiRecordOutOpAsync(
+            ClientOp op,
+            ITransaction? transaction,
+            IEnumerable<T> recs,
+            bool keyOnly = false,
+            int? schemaVersionOverride = null)
+        {
+            // ReSharper disable once PossibleMultipleEnumeration (we may have 
to retry, but this is very rare)
+            using var iterator = recs.GetEnumerator();
+
+            if (!iterator.MoveNext())
+            {
+                return null;
+            }
+
+            try
+            {
+                var schema = await 
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+                var tx = transaction.ToInternal();
+
+                using var writer = ProtoCommon.GetMessageWriter();
+                var colocationHash = _ser.WriteMultiple(writer, tx, schema, 
iterator, keyOnly);
+                var preferredNode = await 
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+                return await DoOutInOpAsync(op, tx, writer, 
preferredNode).ConfigureAwait(false);
+            }
+            catch (IgniteException e) when (e.Code == 
ErrorGroups.Table.SchemaVersionMismatch)
+            {
+                // ReSharper disable once PossibleMultipleEnumeration (we have 
to retry, but this is very rare)
+                return await DoMultiRecordOutOpAsync(op, transaction, recs, 
keyOnly, e.GetExpectedSchemaVersion()).ConfigureAwait(false);
+            }
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 859872dcb0..9b50e98b97 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -166,6 +166,13 @@ namespace Apache.Ignite.Internal.Table
             return GetCachedSchemaAsync(version);
         }
 
+        /// <summary>
+        /// Gets the schema by version.
+        /// </summary>
+        /// <param name="version">Schema version; when null, latest is 
used.</param>
+        /// <returns>Schema.</returns>
+        internal Task<Schema> GetSchemaAsync(int? version) => 
GetCachedSchemaAsync(version ?? _latestSchemaVersion);
+
         /// <summary>
         /// Gets the latest schema.
         /// </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
index 174502161f..474f8b1683 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
@@ -77,7 +77,7 @@ public class ItThinClientSchemaSynchronizationTest extends 
ItAbstractThinClientT
         Tuple rec = Tuple.create().set("ID", 1);
         recordView.insert(null, rec);
 
-        // Modify table, insert data - client will use old schema, receive 
error, retry with new schema.
+        // Modify table, read data - client will use old schema, receive 
error, retry with new schema.
         // The process is transparent for the user: updated schema is in 
effect immediately.
         ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME 
VARCHAR DEFAULT 'def_name'");
         assertEquals("def_name", recordView.get(null, rec).stringValue(1));

Reply via email to