This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e360cc80da IGNITE-24612 .NET: Fix streamer schema update (#5313)
2e360cc80da is described below

commit 2e360cc80da94056fc96d1a935deede3de2b7ac1
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 28 12:17:38 2025 +0200

    IGNITE-24612 .NET: Fix streamer schema update (#5313)
    
    Fix invalid batch situation causing `IndexOutOfBoundsException` on the 
server: batch header contains new schema version, while one of the tuples is 
written with old schema version.
    * Update streamer schema under lock
    * Add batch schema version check
---
 .../Table/SchemaSynchronizationTest.cs             |  5 +-
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 72 ++++++++++++++--------
 2 files changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index b7547a0f9f4..d2e15319f7e 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -344,14 +344,15 @@ public class SchemaSynchronizationTest : IgniteTestsBase
     [SuppressMessage("ReSharper", "AccessToDisposedClosure", Justification = 
"Reviewed")]
     public async Task TestSchemaUpdateWhileStreaming(
         [Values(true, false)] bool insertNewColumn,
-        [Values(true, false)] bool withRemove)
+        [Values(true, false)] bool withRemove,
+        [Values(1, 2, 10)] int pageSize)
     {
         await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} 
(KEY bigint PRIMARY KEY)");
 
         var table = await Client.Tables.GetTableAsync(TestTableName);
         var view = table!.RecordBinaryView;
 
-        var options = DataStreamerOptions.Default with { PageSize = 2 };
+        var options = DataStreamerOptions.Default with { PageSize = pageSize };
         await view.StreamDataAsync(GetData(), options);
 
         // Inserted with old schema.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 8fe516fc963..94697a46f08 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -60,6 +60,10 @@ internal static class DataStreamer
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
     [SuppressMessage("Design", "CA1031:Do not catch general exception types", 
Justification = "Cleanup.")]
     [SuppressMessage("Usage", "CA2219:Do not raise exceptions in finally 
clauses", Justification = "Rethrow.")]
+    [SuppressMessage(
+        "Reliability",
+        "CA2000:Dispose objects before losing scope",
+        Justification = "WaitHandle is not used in SemaphoreSlim, no need to 
dispose.")]
     internal static async Task StreamDataAsync<T>(
         IAsyncEnumerable<DataStreamerItem<T>> data,
         Table table,
@@ -77,6 +81,7 @@ internal static class DataStreamer
         var retryPolicy = new RetryLimitPolicy { RetryLimit = 
options.RetryLimit };
 
         var schema = await table.GetSchemaAsync(null).ConfigureAwait(false);
+        var schemaLock = new SemaphoreSlim(1);
 
         var partitionAssignment = await 
table.GetPartitionAssignmentAsync().ConfigureAwait(false);
         var partitionCount = partitionAssignment.Length; // Can't be changed.
@@ -180,7 +185,7 @@ internal static class DataStreamer
             }
             catch (Exception e) when (e.CausedByUnmappedColumns())
             {
-                schema = await 
table.GetSchemaAsync(Table.SchemaVersionForceLatest).ConfigureAwait(false);
+                await UpdateSchema().ConfigureAwait(false);
                 return Add(item);
             }
         }
@@ -278,15 +283,15 @@ internal static class DataStreamer
 
                 FinalizeBatchHeader(batch);
                 batch.Task = SendAndDisposeBufAsync(
-                    batch.Buffer, batch.PartitionId, batch.Task, batch.Items, 
batch.Count, batch.SchemaOutdated);
+                    batch.Buffer, batch.PartitionId, batch.Task, batch.Items, 
batch.Count, batch.SchemaOutdated, batch.Schema.Version);
 
                 batch.Items = GetPool<T>().Rent(options.PageSize);
                 batch.Count = 0;
-                batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
-                InitBuffer(batch, schema);
-                batch.LastFlush = Stopwatch.GetTimestamp();
                 batch.Schema = schema;
                 batch.SchemaOutdated = false;
+                batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
+                InitBuffer(batch, batch.Schema);
+                batch.LastFlush = Stopwatch.GetTimestamp();
 
                 Metrics.StreamerBatchesActiveIncrement();
             }
@@ -298,22 +303,24 @@ internal static class DataStreamer
             Task oldTask,
             DataStreamerItem<T>[] items,
             int count,
-            bool batchSchemaOutdated)
+            bool batchSchemaOutdated,
+            int batchSchemaVer)
         {
-            Debug.Assert(items.Length > 0, "items.Length > 0");
-
-            if (batchSchemaOutdated)
+            try
             {
-                // Schema update was detected while the batch was being filled.
-                // Re-serialize the whole batch.
-                ReWriteBatch(buf, partitionId, schema, items.AsSpan(0, count), 
writer);
-            }
+                Debug.Assert(items.Length > 0, "items.Length > 0");
+                var schema0 = schema;
 
-            // ReSharper disable once AccessToModifiedClosure
-            var preferredNode = 
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
+                if (batchSchemaOutdated || batchSchemaVer < schema0.Version)
+                {
+                    // Schema update was detected while the batch was being 
filled.
+                    // Re-serialize the whole batch.
+                    ReWriteBatch(buf, partitionId, schema0, items.AsSpan(0, 
count), writer);
+                }
+
+                // ReSharper disable once AccessToModifiedClosure
+                var preferredNode = 
PreferredNode.FromName(partitionAssignment[partitionId] ?? string.Empty);
 
-            try
-            {
                 int? schemaVersion = null;
                 while (true)
                 {
@@ -321,14 +328,9 @@ internal static class DataStreamer
                     {
                         if (schemaVersion is { })
                         {
-                            // Might be updated by another batch.
-                            if (schema.Version != schemaVersion)
-                            {
-                                schema = await 
table.GetSchemaAsync(schemaVersion).ConfigureAwait(false);
-                            }
-
                             // Serialize again with the new schema.
-                            ReWriteBatch(buf, partitionId, schema, 
items.AsSpan(0, count), writer);
+                            schema0 = await 
UpdateSchema(schemaVersion.Value).ConfigureAwait(false);
+                            ReWriteBatch(buf, partitionId, schema0, 
items.AsSpan(0, count), writer);
                         }
 
                         // Wait for the previous batch for this node to 
preserve item order.
@@ -343,7 +345,7 @@ internal static class DataStreamer
                         // Schema update detected after the batch was 
serialized.
                         schemaVersion = e.GetExpectedSchemaVersion();
                     }
-                    catch (Exception e) when (e.CausedByUnmappedColumns() && 
schemaVersion == null)
+                    catch (Exception e) when (e.CausedByUnmappedColumns())
                     {
                         schemaVersion = Table.SchemaVersionForceLatest;
                     }
@@ -397,6 +399,26 @@ internal static class DataStreamer
                 await batch.Task.ConfigureAwait(false);
             }
         }
+
+        async ValueTask<Schema> UpdateSchema(int ver = 
Table.SchemaVersionForceLatest)
+        {
+            await 
schemaLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+            try
+            {
+                if (ver != Table.SchemaVersionForceLatest && schema.Version >= 
ver)
+                {
+                    return schema;
+                }
+
+                schema = await table.GetSchemaAsync(ver).ConfigureAwait(false);
+                return schema;
+            }
+            finally
+            {
+                schemaLock.Release();
+            }
+        }
     }
 
     /// <summary>

Reply via email to