This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c1a6a07307 IGNITE-19838 .NET: Retry outdated schema error (#2434)
c1a6a07307 is described below
commit c1a6a073071fecb85ed6aaa96bff2d0948634ef3
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Aug 10 15:20:28 2023 +0300
IGNITE-19838 .NET: Retry outdated schema error (#2434)
* Read known error extensions data from server
* Retry `SchemaVersionMismatch` error code with specified schema
* Improve code reuse in `RecordView` (there are still multiple methods
which do the same retry logic; we could fix it with delegates `DoOp(() ->
blabla)`, but this will increase allocations)
---
.../Table/SchemaSynchronizationTest.cs | 196 +++++++++++++++++++++
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 18 +-
.../Internal/Common/ExceptionExtensions.cs | 52 ++++++
.../Apache.Ignite/Internal/Compute/Compute.cs | 8 +-
.../Internal/Proto/ErrorExtensions.cs | 29 +++
.../Apache.Ignite/Internal/Table/RecordView.cs | 161 +++++++++--------
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 7 +
.../ItThinClientSchemaSynchronizationTest.java | 2 +-
8 files changed, 398 insertions(+), 75 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
new file mode 100644
index 0000000000..6660876f52
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Threading.Tasks;
+using Compute;
+using Ignite.Compute;
+using Ignite.Table;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for client table schema synchronization.
+/// </summary>
+public class SchemaSynchronizationTest : IgniteTestsBase
+{
+ private static readonly TestMode[] TestModes = Enum.GetValues<TestMode>();
+
+ private static readonly TestMode[] ReadTestModes = { TestMode.One,
TestMode.Multiple };
+
+ public enum TestMode
+ {
+ One,
+ Two,
+ Multiple,
+ Compute
+ }
+
+ private static string TestTableName => TestContext.CurrentContext.Test.Name
+ .Replace("(", string.Empty)
+ .Replace(")", string.Empty);
+
+ [TearDown]
+ public async Task DeleteTable() => await Client.Sql.ExecuteAsync(null,
$"DROP TABLE {TestTableName}");
+
+ [Test]
+ public async Task
TestClientUsesLatestSchemaOnWrite([ValueSource(nameof(TestModes))] TestMode
testMode)
+ {
+ // Create table, insert data.
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID
INT NOT NULL PRIMARY KEY, NAME VARCHAR NOT NULL)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ var rec = new IgniteTuple
+ {
+ ["ID"] = 1,
+ ["NAME"] = "name"
+ };
+
+ await view.InsertAsync(null, rec);
+
+ // Modify table, insert data - client will use old schema, receive
error, retry with new schema.
+ // The process is transparent for the user: updated schema is in
effect immediately.
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} DROP
COLUMN NAME");
+
+ var rec2 = new IgniteTuple
+ {
+ ["ID"] = 2,
+ ["NAME"] = "name2"
+ };
+
+ // TODO this should fail when we implement IGNITE-19836 Reject Tuples
and POCOs with unmapped fields
+ switch (testMode)
+ {
+ case TestMode.One:
+ await view.InsertAsync(null, rec2);
+ break;
+
+ case TestMode.Two:
+ await view.ReplaceAsync(null, rec2, rec2);
+ break;
+
+ case TestMode.Multiple:
+ await view.InsertAllAsync(null, new[] { rec2, rec2, rec2 });
+ break;
+
+ case TestMode.Compute:
+ await Client.Compute.ExecuteColocatedAsync<string>(
+ table.Name, rec2, Array.Empty<DeploymentUnit>(),
ComputeTests.NodeNameJob);
+ break;
+
+ default:
+ Assert.Fail("Invalid test mode: " + testMode);
+ break;
+ }
+ }
+
+ [Test]
+ public async Task
TestClientUsesLatestSchemaOnRead([ValueSource(nameof(ReadTestModes))] TestMode
testMode)
+ {
+ // Create table, insert data.
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID
INT NOT NULL PRIMARY KEY)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ var rec = new IgniteTuple { ["ID"] = 1 };
+ await view.InsertAsync(null, rec);
+
+ // Modify table, insert data - client will use old schema, receive
error, retry with new schema.
+ // The process is transparent for the user: updated schema is in
effect immediately.
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+ switch (testMode)
+ {
+ case TestMode.One:
+ {
+ var res = await view.GetAsync(null, rec);
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual("name1", res.Value["NAME"]);
+ break;
+ }
+
+ case TestMode.Multiple:
+ {
+ var res = await view.GetAllAsync(null, new[] { rec, rec });
+
+ Assert.AreEqual(2, res.Count);
+
+ foreach (var r in res)
+ {
+ Assert.IsTrue(r.HasValue);
+ Assert.AreEqual("name1", r.Value["NAME"]);
+ }
+
+ break;
+ }
+
+ default:
+ Assert.Fail("Invalid test mode: " + testMode);
+ break;
+ }
+ }
+
+ [Test]
+ public async Task TestClientUsesLatestSchemaOnReadPoco()
+ {
+ // Create table, insert data.
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID
INT NOT NULL PRIMARY KEY)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ var rec = new IgniteTuple { ["ID"] = 1 };
+ await view.InsertAsync(null, rec);
+
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+ var pocoView = table.GetRecordView<Poco>();
+ var res = await pocoView.GetAsync(null, new Poco(1, string.Empty));
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual(1, res.Value.Id);
+ Assert.AreEqual("name1", res.Value.Name);
+ }
+
+ [Test]
+ public async Task TestClientUsesLatestSchemaOnReadKv()
+ {
+ // Create table, insert data.
+ await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (ID
INT NOT NULL PRIMARY KEY)");
+
+ var table = await Client.Tables.GetTableAsync(TestTableName);
+ var view = table!.RecordBinaryView;
+
+ var rec = new IgniteTuple { ["ID"] = 1 };
+ await view.InsertAsync(null, rec);
+
+ await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD
COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'");
+
+ var pocoView = table.GetKeyValueView<int, string>();
+ var res = await pocoView.GetAsync(null, 1);
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual("name1", res.Value);
+ }
+
+ private record Poco(int Id, string Name);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 2c5039eb8c..fb26a2eb0a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -399,11 +399,23 @@ namespace Apache.Ignite.Internal
string className = reader.ReadString();
string? message = reader.ReadStringNullable();
string? javaStackTrace = reader.ReadStringNullable();
+ var ex = ExceptionMapper.GetException(traceId, code, className,
message, javaStackTrace);
- // TODO IGNITE-19838 Retry outdated schema error
- reader.Skip(); // Error extensions.
+ int extensionCount = reader.TryReadNil() ? 0 :
reader.ReadMapHeader();
+ for (int i = 0; i < extensionCount; i++)
+ {
+ var key = reader.ReadString();
+ if (key == ErrorExtensions.ExpectedSchemaVersion)
+ {
+ ex.Data[key] = reader.ReadInt32();
+ }
+ else
+ {
+ reader.Skip(); // Unknown extension - ignore.
+ }
+ }
- return ExceptionMapper.GetException(traceId, code, className,
message, javaStackTrace);
+ return ex;
}
private static async ValueTask<PooledBuffer> ReadResponseAsync(
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs
new file mode 100644
index 0000000000..8243e3af06
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ExceptionExtensions.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Common;
+
+using System;
+using System.Diagnostics;
+using Proto;
+
+/// <summary>
+/// Exception extensions.
+/// </summary>
+internal static class ExceptionExtensions
+{
+ /// <summary>
+ /// Gets expected schema version from the error data.
+ /// </summary>
+ /// <param name="e">Exception.</param>
+ /// <returns>Expected schema version.</returns>
+ /// <exception cref="IgniteException">When specified exception does not
have schema version in <see cref="Exception.Data"/>.</exception>
+ public static int GetExpectedSchemaVersion(this IgniteException e)
+ {
+ Debug.Assert(
+ e.Code == ErrorGroups.Table.SchemaVersionMismatch,
+ $"e.Code == ErrorGroups.Table.SchemaVersionMismatch: expected
{ErrorGroups.Table.SchemaVersionMismatch}, actual {e.Code}");
+
+ if (e.Data[ErrorExtensions.ExpectedSchemaVersion] is int schemaVer)
+ {
+ return schemaVer;
+ }
+
+ throw new IgniteException(
+ e.TraceId,
+ ErrorGroups.Client.Protocol,
+ "Expected schema version is not specified in error extension map.",
+ e);
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 0aec31777e..7012b3beab 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -257,11 +257,12 @@ namespace Apache.Ignite.Internal.Compute
IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
var units0 = units as ICollection<DeploymentUnit> ??
units.ToList(); // Avoid multiple enumeration.
+ int? schemaVersion = null;
while (true)
{
var table = await
GetTableAsync(tableName).ConfigureAwait(false);
- var schema = await
table.GetLatestSchemaAsync().ConfigureAwait(false);
+ var schema = await
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
var colocationHash = Write(bufferWriter, table, schema);
@@ -279,6 +280,11 @@ namespace Apache.Ignite.Internal.Compute
// Table was dropped - remove from cache.
// Try again in case a new table with the same name exists.
_tableCache.TryRemove(tableName, out _);
+ schemaVersion = null;
+ }
+ catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch)
+ {
+ schemaVersion = e.GetExpectedSchemaVersion();
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs
new file mode 100644
index 0000000000..3fb888ff00
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ErrorExtensions.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto;
+
+/// <summary>
+/// Error data extensions. When the server returns an error response, it may
contain additional data in a map. Keys are defined here.
+/// </summary>
+internal static class ErrorExtensions
+{
+ /// <summary>
+ /// Expected schema version for <see
cref="ErrorGroups.Table.SchemaVersionMismatch"/> error.
+ /// </summary>
+ public const string ExpectedSchemaVersion = "expected-schema-ver";
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index b03b61c441..4d2a318462 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -147,25 +147,16 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(keys, nameof(keys));
- using var iterator = keys.GetEnumerator();
-
- if (!iterator.MoveNext())
+ using var resBuf = await
DoMultiRecordOutOpAsync(ClientOp.TupleGetAll, transaction, keys,
true).ConfigureAwait(false);
+ if (resBuf == null)
{
return resultFactory(0);
}
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
-
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator, keyOnly: true);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
-
- using var resBuf = await DoOutInOpAsync(ClientOp.TupleGetAll, tx,
writer, preferredNode).ConfigureAwait(false);
- var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
- return _ser.ReadMultipleNullable(resBuf, resSchema, resultFactory,
addAction);
+ return _ser.ReadMultipleNullable(resBuf.Value, resSchema,
resultFactory, addAction);
}
/// <inheritdoc/>
@@ -181,21 +172,7 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(records, nameof(records));
- using var iterator = records.GetEnumerator();
-
- if (!iterator.MoveNext())
- {
- return;
- }
-
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
-
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
-
- using var resBuf = await DoOutInOpAsync(ClientOp.TupleUpsertAll,
tx, writer, preferredNode).ConfigureAwait(false);
+ using var resBuf = await
DoMultiRecordOutOpAsync(ClientOp.TupleUpsertAll, transaction,
records).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -223,26 +200,17 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(records, nameof(records));
- using var iterator = records.GetEnumerator();
-
- if (!iterator.MoveNext())
+ using var resBuf = await
DoMultiRecordOutOpAsync(ClientOp.TupleInsertAll, transaction,
records).ConfigureAwait(false);
+ if (resBuf == null)
{
return Array.Empty<T>();
}
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
-
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
-
- using var resBuf = await DoOutInOpAsync(ClientOp.TupleInsertAll,
tx, writer, preferredNode).ConfigureAwait(false);
- var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
return _ser.ReadMultiple(
- buf: resBuf,
+ buf: resBuf.Value,
schema: resSchema,
keyOnly: false,
resultFactory: static count => count == 0
@@ -265,14 +233,9 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(record, nameof(record));
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
-
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteTwo(writer, tx, schema, record,
newRecord);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
+ using var resBuf = await
DoTwoRecordOutOpAsync(ClientOp.TupleReplaceExact, transaction, record,
newRecord)
+ .ConfigureAwait(false);
- using var resBuf = await
DoOutInOpAsync(ClientOp.TupleReplaceExact, tx, writer,
preferredNode).ConfigureAwait(false);
return ReadSchemaAndBoolean(resBuf);
}
@@ -342,7 +305,7 @@ namespace Apache.Ignite.Internal.Table
.ConfigureAwait(false);
},
writer: _ser.Handler,
- schemaProvider: () => _table.GetLatestSchemaAsync(),
+ schemaProvider: () => _table.GetLatestSchemaAsync(), // TODO
IGNITE-19710 retry outdated schema.
partitionAssignmentProvider: () =>
_table.GetPartitionAssignmentAsync(),
options ?? DataStreamerOptions.Default,
cancellationToken).ConfigureAwait(false);
@@ -395,27 +358,18 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(records, nameof(records));
- using var iterator = records.GetEnumerator();
-
- if (!iterator.MoveNext())
+ var clientOp = exact ? ClientOp.TupleDeleteAllExact :
ClientOp.TupleDeleteAll;
+ using var resBuf = await DoMultiRecordOutOpAsync(clientOp,
transaction, records, keyOnly: !exact).ConfigureAwait(false);
+ if (resBuf == null)
{
return resultFactory(0);
}
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
-
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator, keyOnly: !exact);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
-
- var clientOp = exact ? ClientOp.TupleDeleteAllExact :
ClientOp.TupleDeleteAll;
- using var resBuf = await DoOutInOpAsync(clientOp, tx, writer,
preferredNode).ConfigureAwait(false);
- var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
return _ser.ReadMultiple(
- buf: resBuf,
+ buf: resBuf.Value,
schema: resSchema,
keyOnly: !exact,
resultFactory: resultFactory,
@@ -448,16 +402,83 @@ namespace Apache.Ignite.Internal.Table
ClientOp op,
ITransaction? transaction,
T record,
- bool keyOnly = false)
+ bool keyOnly = false,
+ int? schemaVersionOverride = null)
+ {
+ try
+ {
+ var schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+ var tx = transaction.ToInternal();
+
+ using var writer = ProtoCommon.GetMessageWriter();
+ var colocationHash = _ser.Write(writer, tx, schema, record,
keyOnly);
+ var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+ return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
+ }
+ catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch)
+ {
+ return await DoRecordOutOpAsync(op, transaction, record,
keyOnly, e.GetExpectedSchemaVersion()).ConfigureAwait(false);
+ }
+ }
+
+ private async Task<PooledBuffer> DoTwoRecordOutOpAsync(
+ ClientOp op,
+ ITransaction? transaction,
+ T record,
+ T record2,
+ bool keyOnly = false,
+ int? schemaVersionOverride = null)
{
- var schema = await
_table.GetLatestSchemaAsync().ConfigureAwait(false);
- var tx = transaction.ToInternal();
+ try
+ {
+ var schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+ var tx = transaction.ToInternal();
- using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.Write(writer, tx, schema, record,
keyOnly);
- var preferredNode = await _table.GetPreferredNode(colocationHash,
transaction).ConfigureAwait(false);
+ using var writer = ProtoCommon.GetMessageWriter();
+ var colocationHash = _ser.WriteTwo(writer, tx, schema, record,
record2, keyOnly);
+ var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+ return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
+ }
+ catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch)
+ {
+ return await DoTwoRecordOutOpAsync(op, transaction, record,
record2, keyOnly, e.GetExpectedSchemaVersion())
+ .ConfigureAwait(false);
+ }
+ }
- return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
+ private async Task<PooledBuffer?> DoMultiRecordOutOpAsync(
+ ClientOp op,
+ ITransaction? transaction,
+ IEnumerable<T> recs,
+ bool keyOnly = false,
+ int? schemaVersionOverride = null)
+ {
+ // ReSharper disable once PossibleMultipleEnumeration (we may have
to retry, but this is very rare)
+ using var iterator = recs.GetEnumerator();
+
+ if (!iterator.MoveNext())
+ {
+ return null;
+ }
+
+ try
+ {
+ var schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
+ var tx = transaction.ToInternal();
+
+ using var writer = ProtoCommon.GetMessageWriter();
+ var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator, keyOnly);
+ var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+
+ return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
+ }
+ catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch)
+ {
+ // ReSharper disable once PossibleMultipleEnumeration (we have
to retry, but this is very rare)
+ return await DoMultiRecordOutOpAsync(op, transaction, recs,
keyOnly, e.GetExpectedSchemaVersion()).ConfigureAwait(false);
+ }
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 859872dcb0..9b50e98b97 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -166,6 +166,13 @@ namespace Apache.Ignite.Internal.Table
return GetCachedSchemaAsync(version);
}
+ /// <summary>
+ /// Gets the schema by version.
+ /// </summary>
+ /// <param name="version">Schema version; when null, latest is
used.</param>
+ /// <returns>Schema.</returns>
+ internal Task<Schema> GetSchemaAsync(int? version) =>
GetCachedSchemaAsync(version ?? _latestSchemaVersion);
+
/// <summary>
/// Gets the latest schema.
/// </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
index 174502161f..474f8b1683 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSchemaSynchronizationTest.java
@@ -77,7 +77,7 @@ public class ItThinClientSchemaSynchronizationTest extends
ItAbstractThinClientT
Tuple rec = Tuple.create().set("ID", 1);
recordView.insert(null, rec);
- // Modify table, insert data - client will use old schema, receive
error, retry with new schema.
+ // Modify table, read data - client will use old schema, receive
error, retry with new schema.
// The process is transparent for the user: updated schema is in
effect immediately.
ses.execute(null, "ALTER TABLE " + tableName + " ADD COLUMN NAME
VARCHAR DEFAULT 'def_name'");
assertEquals("def_name", recordView.get(null, rec).stringValue(1));