This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2e360cc80da IGNITE-24612 .NET: Fix streamer schema update (#5313)
2e360cc80da is described below
commit 2e360cc80da94056fc96d1a935deede3de2b7ac1
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 28 12:17:38 2025 +0200
IGNITE-24612 .NET: Fix streamer schema update (#5313)
Fix invalid batch situation causing `IndexOutOfBoundsException` on the
server: batch header contains new schema version, while one of the tuples is
written with old schema version.
* Update streamer schema under lock
* Add batch schema version check
---
.../Table/SchemaSynchronizationTest.cs | 5 +-
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 72 ++++++++++++++--------
2 files changed, 50 insertions(+), 27 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index b7547a0f9f4..d2e15319f7e 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -344,14 +344,15 @@ public class SchemaSynchronizationTest : IgniteTestsBase
[SuppressMessage("ReSharper", "AccessToDisposedClosure", Justification =
"Reviewed")]
public async Task TestSchemaUpdateWhileStreaming(
[Values(true, false)] bool insertNewColumn,
- [Values(true, false)] bool withRemove)
+ [Values(true, false)] bool withRemove,
+ [Values(1, 2, 10)] int pageSize)
{
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName}
(KEY bigint PRIMARY KEY)");
var table = await Client.Tables.GetTableAsync(TestTableName);
var view = table!.RecordBinaryView;
- var options = DataStreamerOptions.Default with { PageSize = 2 };
+ var options = DataStreamerOptions.Default with { PageSize = pageSize };
await view.StreamDataAsync(GetData(), options);
// Inserted with old schema.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 8fe516fc963..94697a46f08 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -60,6 +60,10 @@ internal static class DataStreamer
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
[SuppressMessage("Design", "CA1031:Do not catch general exception types",
Justification = "Cleanup.")]
[SuppressMessage("Usage", "CA2219:Do not raise exceptions in finally
clauses", Justification = "Rethrow.")]
+ [SuppressMessage(
+ "Reliability",
+ "CA2000:Dispose objects before losing scope",
+ Justification = "WaitHandle is not used in SemaphoreSlim, no need to
dispose.")]
internal static async Task StreamDataAsync<T>(
IAsyncEnumerable<DataStreamerItem<T>> data,
Table table,
@@ -77,6 +81,7 @@ internal static class DataStreamer
var retryPolicy = new RetryLimitPolicy { RetryLimit =
options.RetryLimit };
var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+ var schemaLock = new SemaphoreSlim(1);
var partitionAssignment = await
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
var partitionCount = partitionAssignment.Length; // Can't be changed.
@@ -180,7 +185,7 @@ internal static class DataStreamer
}
catch (Exception e) when (e.CausedByUnmappedColumns())
{
- schema = await
table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+ await UpdateSchema().ConfigureAwait(false);
return Add(item);
}
}
@@ -278,15 +283,15 @@ internal static class DataStreamer
FinalizeBatchHeader(batch);
batch.Task = SendAndDisposeBufAsync(
- batch.Buffer, batch.PartitionId, batch.Task, batch.Items,
batch.Count, batch.SchemaOutdated);
+ batch.Buffer, batch.PartitionId, batch.Task, batch.Items,
batch.Count, batch.SchemaOutdated, batch.Schema.Version);
batch.Items = GetPool<T>().Rent(options.PageSize);
batch.Count = 0;
- batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
- InitBuffer(batch, schema);
- batch.LastFlush = Stopwatch.GetTimestamp();
batch.Schema = schema;
batch.SchemaOutdated = false;
+ batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
+ InitBuffer(batch, batch.Schema);
+ batch.LastFlush = Stopwatch.GetTimestamp();
Metrics.StreamerBatchesActiveIncrement();
}
@@ -298,22 +303,24 @@ internal static class DataStreamer
Task oldTask,
DataStreamerItem<T>[] items,
int count,
- bool batchSchemaOutdated)
+ bool batchSchemaOutdated,
+ int batchSchemaVer)
{
- Debug.Assert(items.Length > 0, "items.Length > 0");
-
- if (batchSchemaOutdated)
+ try
{
- // Schema update was detected while the batch was being filled.
- // Re-serialize the whole batch.
- ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count),
writer);
- }
+ Debug.Assert(items.Length > 0, "items.Length > 0");
+ var schema0 = schema;
- // ReSharper disable once AccessToModifiedClosure
- var preferredNode =
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+ if (batchSchemaOutdated || batchSchemaVer < schema0.Version)
+ {
+ // Schema update was detected while the batch was being
filled.
+ // Re-serialize the whole batch.
+ ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0,
count), writer);
+ }
+
+ // ReSharper disable once AccessToModifiedClosure
+ var preferredNode =
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
- try
- {
int? schemaVersion = null;
while (true)
{
@@ -321,14 +328,9 @@ internal static class DataStreamer
{
if (schemaVersion is { })
{
- // Might be updated by another batch.
- if (schema.Version != schemaVersion)
- {
- schema = await
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
- }
-
// Serialize again with the new schema.
- ReWriteBatch(buf, partitionId, schema,
items.AsSpan(0, count), writer);
+ schema0 = await
UpdateSchema(schemaVersion.Value).ConfigureAwait(false);
+ ReWriteBatch(buf, partitionId, schema0,
items.AsSpan(0, count), writer);
}
// Wait for the previous batch for this node to
preserve item order.
@@ -343,7 +345,7 @@ internal static class DataStreamer
// Schema update detected after the batch was
serialized.
schemaVersion = e.GetExpectedSchemaVersion();
}
- catch (Exception e) when (e.CausedByUnmappedColumns() &&
schemaVersion == null)
+ catch (Exception e) when (e.CausedByUnmappedColumns())
{
schemaVersion = Table.SchemaVersionForceLatest;
}
@@ -397,6 +399,26 @@ internal static class DataStreamer
await batch.Task.ConfigureAwait(false);
}
}
+
+ async ValueTask<Schema> UpdateSchema(int ver =
Table.SchemaVersionForceLatest)
+ {
+ await
schemaLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+ try
+ {
+ if (ver != Table.SchemaVersionForceLatest && schema.Version >=
ver)
+ {
+ return schema;
+ }
+
+ schema = await table.GetSchemaAsync(ver).ConfigureAwait(false);
+ return schema;
+ }
+ finally
+ {
+ schemaLock.Release();
+ }
+ }
}
/// <summary>