This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a24cc6145 IGNITE-21490 .NET: Add DataStreamer data removal (#3560)
9a24cc6145 is described below

commit 9a24cc614535c2e668cc0871bec1f1fee5928d29
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Apr 8 11:56:47 2024 +0300

    IGNITE-21490 .NET: Add DataStreamer data removal (#3560)
    
    Change `DataStreamer` API to allow data removal:
    * `IDataStreamerTarget` now accepts `IAsyncEnumerable<DataStreamerItem<T>>`
    * Simplified overload accepts `IAsyncEnumerable<T>` as it was before
    * `DataStreamerItem<T>` is a struct record (no allocations), accepts data 
item and `DataStreamerOperationType` enum
    * `DataStreamerItem` factory class added to simplify generics (similar to 
standard library `KeyValuePair.Create`:
      * Constructor: `new DataStreamerItem<MyEntityClass<Guid>>(entity)` - 
entity type must be specified
      * Factory: `DataStreamerItem.Create(entity)` - entity type is inferred
---
 .../Apache.Ignite.Tests/ProjectFilesTests.cs       |   5 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs |  97 ++++++++++--
 .../dotnet/Apache.Ignite.Tests/Table/Poco.cs       |   2 +
 .../Table/SchemaSynchronizationTest.cs             |  27 +++-
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |   2 +-
 .../Internal/Buffers/PooledArrayBuffer.cs          |  34 +++--
 .../Apache.Ignite/Internal/Table/DataStreamer.cs   | 170 ++++++++++++++++-----
 .../Apache.Ignite/Internal/Table/KeyValueView.cs   |   7 +-
 .../Apache.Ignite/Internal/Table/RecordView.cs     |   2 +-
 .../Table/Serialization/RecordSerializer.cs        |  22 ---
 .../dotnet/Apache.Ignite/Table/DataStreamerItem.cs |  53 +++++++
 .../Table/DataStreamerOperationType.cs}            |  28 ++--
 .../Apache.Ignite/Table/IDataStreamerTarget.cs     |  31 +++-
 .../streamer/ItAbstractDataStreamerTest.java       |  21 +++
 14 files changed, 391 insertions(+), 110 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
index fa3715a6e3..8c83306089 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
@@ -78,7 +78,10 @@ namespace Apache.Ignite.Tests
                 if (text.Contains("public class", StringComparison.Ordinal) ||
                     text.Contains("public record", StringComparison.Ordinal))
                 {
-                    Assert.Fail("Public classes must be sealed: " + file);
+                    if (!text.Contains("public record struct"))
+                    {
+                        Assert.Fail("Public classes must be sealed: " + file);
+                    }
                 }
             }
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 15b9e6aaeb..4c6210ac15 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -28,7 +28,7 @@ using Internal.Proto;
 using NUnit.Framework;
 
 /// <summary>
-/// Tests for <see cref="IDataStreamerTarget{T}.StreamDataAsync"/>.
+/// Tests for <see cref="IDataStreamerTarget{T}"/>.
 /// <para />
 /// See DataStreamer partition awareness tests in <see 
cref="PartitionAwarenessTests"/>.
 /// </summary>
@@ -36,25 +36,48 @@ public class DataStreamerTests : IgniteTestsBase
 {
     private const int Count = 100;
 
+    private const int UpdatedKey = Count / 2;
+
+    private const int DeletedKey = Count + 1;
+
+    private static int _unknownKey = 333000;
+
     [SetUp]
-    public async Task DeleteAll() =>
-        await TupleView.DeleteAllAsync(null, Enumerable.Range(0, 
Count).Select(x => GetTuple(x)));
+    public async Task PrepareData()
+    {
+        await TupleView.UpsertAsync(null, GetTuple(UpdatedKey, "update me"));
+        await TupleView.UpsertAsync(null, GetTuple(DeletedKey, "delete me"));
+    }
+
+    [TearDown]
+    public async Task DeleteAll() => await Client.Sql.ExecuteAsync(null, 
$"DELETE FROM {TableName}");
 
     [Test]
     public async Task TestBasicStreamingRecordBinaryView()
     {
-        var options = DataStreamerOptions.Default with { PageSize = 10 };
-        var data = Enumerable.Range(0, Count).Select(x => GetTuple(x, "t" + 
x)).ToList();
-
-        await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+        await TupleView.StreamDataAsync(GetData(), DataStreamerOptions.Default 
with { PageSize = 10 });
         await CheckData();
+
+        static async IAsyncEnumerable<DataStreamerItem<IIgniteTuple>> GetData()
+        {
+            for (int i = 0; i < Count; i++)
+            {
+                yield return DataStreamerItem.Create(GetTuple(i, "t" + i));
+            }
+
+            await Task.Yield();
+            yield return DataStreamerItem.Create(GetTuple(DeletedKey), 
DataStreamerOperationType.Remove);
+        }
     }
 
     [Test]
     public async Task TestBasicStreamingRecordView()
     {
         var options = DataStreamerOptions.Default with { PageSize = 5 };
-        var data = Enumerable.Range(0, Count).Select(x => GetPoco(x, "t" + 
x)).ToList();
+        var data = Enumerable.Range(0, Count)
+            .Select(x => DataStreamerItem.Create(GetPoco(x, "t" + x)))
+            .Concat(new[] { DataStreamerItem.Create(GetPoco(DeletedKey), 
DataStreamerOperationType.Remove) })
+            .ToList();
 
         await 
Table.GetRecordView<Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
         await CheckData();
@@ -65,7 +88,8 @@ public class DataStreamerTests : IgniteTestsBase
     {
         var options = DataStreamerOptions.Default with { PageSize = 10_000 };
         var data = Enumerable.Range(0, Count)
-            .Select(x => new KeyValuePair<IIgniteTuple, 
IIgniteTuple>(GetTuple(x), GetTuple("t" + x)))
+            .Select(x => 
DataStreamerItem.Create(KeyValuePair.Create(GetTuple(x), GetTuple("t" + x))))
+            .Concat(new[] { 
DataStreamerItem.Create(KeyValuePair.Create(GetTuple(DeletedKey), 
default(IIgniteTuple)!), DataStreamerOperationType.Remove) })
             .ToList();
 
         await 
Table.KeyValueBinaryView.StreamDataAsync(data.ToAsyncEnumerable(), options);
@@ -77,7 +101,8 @@ public class DataStreamerTests : IgniteTestsBase
     {
         var options = DataStreamerOptions.Default with { PageSize = 1 };
         var data = Enumerable.Range(0, Count)
-            .Select(x => new KeyValuePair<long, Poco>(x, GetPoco(x, "t" + x)))
+            .Select(x => DataStreamerItem.Create(KeyValuePair.Create((long)x, 
GetPoco(x, "t" + x))))
+            .Concat(new[] { 
DataStreamerItem.Create(KeyValuePair.Create((long)DeletedKey, default(Poco)!), 
DataStreamerOperationType.Remove) })
             .ToList();
 
         await Table.GetKeyValueView<long, 
Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
@@ -182,6 +207,55 @@ public class DataStreamerTests : IgniteTestsBase
         Assert.That(server.DroppedConnectionCount, 
Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
     }
 
+    [Test]
+    public async Task TestAddUpdateRemoveMixed(
+        [Values(1, 2, 100)] int pageSize,
+        [Values(true, false)] bool existingMinKey)
+    {
+        if (pageSize > 1)
+        {
+            // TODO: IGNITE-21992 Data Streamer removal does not work for a 
new key in the same batch
+            return;
+        }
+
+        var minKey = existingMinKey ? UpdatedKey : Interlocked.Add(ref 
_unknownKey, 10);
+        await Table.GetRecordView<Poco>().StreamDataAsync(
+            GetData(),
+            DataStreamerOptions.Default with { PageSize = pageSize });
+
+        IList<Option<Poco>> res = await PocoView.GetAllAsync(null, 
Enumerable.Range(minKey, 4).Select(x => GetPoco(x)));
+        Assert.AreEqual(4, res.Count);
+
+        Assert.IsFalse(res[0].HasValue, "Deleted key should not exist: " + 
res[0]);
+
+        Assert.IsTrue(res[1].HasValue);
+        Assert.AreEqual("created2", res[1].Value.Val);
+
+        Assert.IsTrue(res[2].HasValue);
+        Assert.AreEqual("updated", res[2].Value.Val);
+
+        Assert.IsTrue(res[3].HasValue);
+        Assert.AreEqual("created", res[3].Value.Val);
+
+        async IAsyncEnumerable<DataStreamerItem<Poco>> GetData()
+        {
+            await Task.Yield();
+            yield return DataStreamerItem.Create(GetPoco(minKey, "created"));
+            yield return DataStreamerItem.Create(GetPoco(minKey, "updated"));
+            yield return DataStreamerItem.Create(GetPoco(minKey, "deleted"), 
DataStreamerOperationType.Remove);
+
+            yield return DataStreamerItem.Create(GetPoco(minKey + 1, 
"created"));
+            yield return DataStreamerItem.Create(GetPoco(minKey + 1, 
"updated"));
+            yield return DataStreamerItem.Create(GetPoco(minKey + 1, 
"deleted"), DataStreamerOperationType.Remove);
+            yield return DataStreamerItem.Create(GetPoco(minKey + 1, 
"created2"));
+
+            yield return DataStreamerItem.Create(GetPoco(minKey + 2, 
"created"));
+            yield return DataStreamerItem.Create(GetPoco(minKey + 2, 
"updated"));
+
+            yield return DataStreamerItem.Create(GetPoco(minKey + 3, 
"created"));
+        }
+    }
+
     private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int 
count)
     {
         for (var i = 0; i < count; i++)
@@ -211,5 +285,8 @@ public class DataStreamerTests : IgniteTestsBase
         {
             Assert.IsTrue(hasVal);
         }
+
+        var deletedExists = await TupleView.ContainsKeyAsync(null, 
GetTuple(DeletedKey));
+        Assert.IsFalse(deletedExists);
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
index 240588d506..a26d94150b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
@@ -34,5 +34,7 @@ namespace Apache.Ignite.Tests.Table
 
         [NotMapped]
         public string? UnmappedStr { get; set; }
+
+        public override string ToString() => $"Poco [Key={Key}, Val={Val}]";
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 22ac2370d2..7bc9cb63c4 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -45,7 +45,8 @@ public class SchemaSynchronizationTest : IgniteTestsBase
     }
 
     private static string TestTableName => TestContext.CurrentContext.Test.Name
-        .Replace("(", string.Empty)
+        .Replace("(", "_")
+        .Replace(",", "_")
         .Replace(")", string.Empty);
 
     [TearDown]
@@ -334,7 +335,9 @@ public class SchemaSynchronizationTest : IgniteTestsBase
 
     [Test]
     [SuppressMessage("ReSharper", "AccessToDisposedClosure", Justification = 
"Reviewed")]
-    public async Task TestSchemaUpdateWhileStreaming([Values(true, false)] 
bool insertNewColumn)
+    public async Task TestSchemaUpdateWhileStreaming(
+        [Values(true, false)] bool insertNewColumn,
+        [Values(true, false)] bool withRemove)
     {
         using var metricListener = new MetricsTests.Listener();
         await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} 
(KEY bigint PRIMARY KEY)");
@@ -353,12 +356,17 @@ public class SchemaSynchronizationTest : IgniteTestsBase
         var res2 = await view.GetAsync(null, GetTuple(19));
         Assert.AreEqual(insertNewColumn ? "BAR_19" : "FOO", res2.Value["VAL"]);
 
-        async IAsyncEnumerable<IIgniteTuple> GetData()
+        async IAsyncEnumerable<DataStreamerItem<IIgniteTuple>> GetData()
         {
             // First set of batches uses old schema.
             for (int i = 0; i < 20; i++)
             {
-                yield return GetTuple(i);
+                if (withRemove)
+                {
+                    yield return DataStreamerItem.Create(GetTuple(-i), 
DataStreamerOperationType.Remove);
+                }
+
+                yield return DataStreamerItem.Create(GetTuple(i));
             }
 
             // Wait for background streaming to complete.
@@ -369,9 +377,16 @@ public class SchemaSynchronizationTest : IgniteTestsBase
             // New schema has a new column with a default value, so it is not 
required to provide it in the streamed data.
             await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} 
ADD COLUMN VAL varchar DEFAULT 'FOO'");
 
-            for (int i = 10; i < 20; i++)
+            for (int i = 10; i < 30; i++)
             {
-                yield return insertNewColumn ? GetTuple(i, "BAR_" + i) : 
GetTuple(i);
+                if (withRemove)
+                {
+                    yield return DataStreamerItem.Create(GetTuple(-i), 
DataStreamerOperationType.Remove);
+                }
+
+                yield return insertNewColumn
+                    ? DataStreamerItem.Create(GetTuple(i, "BAR_" + i))
+                    : DataStreamerItem.Create(GetTuple(i));
             }
         }
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 0a1d3ee484..44c034e1f8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -147,7 +147,7 @@ namespace Apache.Ignite
         ComputeChangePriority,
 
         /// <summary>
-        /// Send data streamer batch (<see 
cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+        /// Send data streamer batch (<see cref="IDataStreamerTarget{T}"/>).
         /// </summary>
         StreamerBatchSend
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 86cf4fd1a6..aaef2724f9 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -60,9 +60,24 @@ namespace Apache.Ignite.Internal.Buffers
         public MsgPackWriter MessageWriter => new(this);
 
         /// <summary>
-        /// Gets the current position.
+        /// Gets or sets the current position.
         /// </summary>
-        public int Position => _index - _prefixSize;
+        public int Position
+        {
+            get => _index - _prefixSize;
+            set
+            {
+                Debug.Assert(value >= 0, "value >= 0");
+                Debug.Assert(value <= _buffer.Length - _prefixSize, "value <= 
_buffer.Length - _prefixSize");
+
+                _index = value + _prefixSize;
+            }
+        }
+
+        /// <summary>
+        /// Gets or sets the data offset from the start of the buffer.
+        /// </summary>
+        public int Offset { get; set; }
 
         /// <summary>
         /// Gets the free capacity.
@@ -77,7 +92,7 @@ namespace Apache.Ignite.Internal.Buffers
         {
             Debug.Assert(!_disposed, "!_disposed");
 
-            return new(_buffer, start: 0, length: _index);
+            return new(_buffer, start: Offset, length: _index);
         }
 
         /// <summary>
@@ -95,7 +110,11 @@ namespace Apache.Ignite.Internal.Buffers
         /// <summary>
         /// Resets the buffer to the initial state.
         /// </summary>
-        public void Reset() => _index = _prefixSize;
+        public void Reset()
+        {
+            _index = _prefixSize;
+            Offset = 0;
+        }
 
         /// <summary>
         /// Gets a span for writing.
@@ -210,13 +229,6 @@ namespace Apache.Ignite.Internal.Buffers
             _index += 8;
         }
 
-        /// <summary>
-        /// Reads a byte at specified position.
-        /// </summary>
-        /// <param name="pos">Position.</param>
-        /// <returns>Result.</returns>
-        public byte ReadByte(int pos) => _buffer[pos + _prefixSize];
-
         /// <summary>
         /// Reads a short at specified position.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 59655b0533..52d5402292 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,6 +22,7 @@ using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
+using System.Linq;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -60,7 +61,7 @@ internal static class DataStreamer
     /// <typeparam name="T">Element type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
     internal static async Task StreamDataAsync<T>(
-        IAsyncEnumerable<T> data,
+        IAsyncEnumerable<DataStreamerItem<T>> data,
         Table table,
         RecordSerializer<T> writer,
         DataStreamerOptions options,
@@ -107,10 +108,10 @@ internal static class DataStreamer
                 // However, not all producers support cancellation, so we need 
to check it here as well.
                 cancellationToken.ThrowIfCancellationRequested();
 
-                var (batch, partition) = await 
AddWithRetryUnmapped(item).ConfigureAwait(false);
+                var batch = await 
AddWithRetryUnmapped(item).ConfigureAwait(false);
                 if (batch.Count >= options.PageSize)
                 {
-                    await SendAsync(batch, partition).ConfigureAwait(false);
+                    await SendAsync(batch).ConfigureAwait(false);
                 }
 
                 if (lastPartitionsAssignmentCheck.Elapsed > 
PartitionAssignmentUpdateFrequency)
@@ -136,7 +137,7 @@ internal static class DataStreamer
             foreach (var batch in batches.Values)
             {
                 batch.Buffer.Dispose();
-                ArrayPool<T>.Shared.Return(batch.Items);
+                GetPool<T>().Return(batch.Items);
 
                 Metrics.StreamerItemsQueuedDecrement(batch.Count);
                 Metrics.StreamerBatchesActiveDecrement();
@@ -145,7 +146,7 @@ internal static class DataStreamer
 
         return;
 
-        async ValueTask<(Batch<T> Batch, int Partition)> 
AddWithRetryUnmapped(T item)
+        async ValueTask<Batch<T>> AddWithRetryUnmapped(DataStreamerItem<T> 
item)
         {
             try
             {
@@ -158,10 +159,15 @@ internal static class DataStreamer
             }
         }
 
-        (Batch<T> Batch, int Partition) Add(T item)
+        Batch<T> Add(DataStreamerItem<T> item)
         {
             var schema0 = schema;
-            var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length, 
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
+
+            var columnCount = item.OperationType == 
DataStreamerOperationType.Remove
+                ? schema0.KeyColumns.Length
+                : schema0.Columns.Length;
+
+            var tupleBuilder = new BinaryTupleBuilder(columnCount, 
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
 
             try
             {
@@ -173,7 +179,7 @@ internal static class DataStreamer
             }
         }
 
-        (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder 
tupleBuilder, Schema schema0)
+        Batch<T> Add0(DataStreamerItem<T> item, ref BinaryTupleBuilder 
tupleBuilder, Schema schema0)
         {
             var columnCount = schema0.Columns.Length;
 
@@ -182,7 +188,8 @@ internal static class DataStreamer
             Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
             Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref 
MemoryMarshal.GetReference(noValueSet), columnCount);
 
-            writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly: 
false, noValueSetRef);
+            var keyOnly = item.OperationType == 
DataStreamerOperationType.Remove;
+            writer.Handler.Write(ref tupleBuilder, item.Data, schema0, 
keyOnly: keyOnly, noValueSetRef);
 
             var partitionId = Math.Abs(tupleBuilder.GetHash() % 
partitionCount);
             var batch = GetOrCreateBatch(partitionId);
@@ -210,7 +217,7 @@ internal static class DataStreamer
 
             Metrics.StreamerItemsQueuedIncrement();
 
-            return (batch, partitionId);
+            return batch;
         }
 
         Batch<T> GetOrCreateBatch(int partitionId)
@@ -219,8 +226,8 @@ internal static class DataStreamer
 
             if (batchRef == null)
             {
-                batchRef = new Batch<T>(options.PageSize, schema);
-                InitBuffer(batchRef, partitionId, schema);
+                batchRef = new Batch<T>(options.PageSize, schema, partitionId);
+                InitBuffer(batchRef, schema);
 
                 Metrics.StreamerBatchesActiveIncrement();
             }
@@ -228,7 +235,7 @@ internal static class DataStreamer
             return batchRef;
         }
 
-        async Task SendAsync(Batch<T> batch, int partitionId)
+        async Task SendAsync(Batch<T> batch)
         {
             var expectedSize = batch.Count;
 
@@ -243,18 +250,14 @@ internal static class DataStreamer
                     return;
                 }
 
-                var buf = batch.Buffer;
-
-                // See RecordSerializer.WriteMultiple.
-                buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
-                buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
-
-                batch.Task = SendAndDisposeBufAsync(buf, partitionId, 
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+                FinalizeBatchHeader(batch);
+                batch.Task = SendAndDisposeBufAsync(
+                    batch.Buffer, batch.PartitionId, batch.Task, batch.Items, 
batch.Count, batch.SchemaOutdated);
 
-                batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
+                batch.Items = GetPool<T>().Rent(options.PageSize);
                 batch.Count = 0;
                 batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf 
will be disposed in SendAndDisposeBufAsync.
-                InitBuffer(batch, partitionId, schema);
+                InitBuffer(batch, schema);
                 batch.LastFlush = Stopwatch.GetTimestamp();
                 batch.Schema = schema;
                 batch.SchemaOutdated = false;
@@ -267,12 +270,14 @@ internal static class DataStreamer
             PooledArrayBuffer buf,
             int partitionId,
             Task oldTask,
-            T[] items,
+            DataStreamerItem<T>[] items,
             int count,
             bool batchSchemaOutdated)
         {
             Debug.Assert(items.Length > 0, "items.Length > 0");
 
+            Console.WriteLine("Sending batch: " + 
items.Take(count).StringJoin());
+
             if (batchSchemaOutdated)
             {
                 // Schema update was detected while the batch was being filled.
@@ -323,7 +328,7 @@ internal static class DataStreamer
             finally
             {
                 buf.Dispose();
-                ArrayPool<T>.Shared.Return(items);
+                GetPool<T>().Return(items);
 
                 Metrics.StreamerItemsQueuedDecrement(count);
                 Metrics.StreamerBatchesActiveDecrement();
@@ -337,11 +342,11 @@ internal static class DataStreamer
                 await Task.Delay(options.AutoFlushFrequency, 
flushCt).ConfigureAwait(false);
                 var ts = Stopwatch.GetTimestamp();
 
-                foreach (var (partition, batch) in batches)
+                foreach (var batch in batches.Values)
                 {
                     if (batch.Count > 0 && ts - batch.LastFlush > 
options.AutoFlushFrequency.Ticks)
                     {
-                        await SendAsync(batch, 
partition).ConfigureAwait(false);
+                        await SendAsync(batch).ConfigureAwait(false);
                     }
                 }
             }
@@ -349,11 +354,11 @@ internal static class DataStreamer
 
         async Task Drain()
         {
-            foreach (var (partition, batch) in batches)
+            foreach (var batch in batches.Values)
             {
                 if (batch.Count > 0)
                 {
-                    await SendAsync(batch, partition).ConfigureAwait(false);
+                    await SendAsync(batch).ConfigureAwait(false);
                 }
 
                 await batch.Task.ConfigureAwait(false);
@@ -361,43 +366,127 @@ internal static class DataStreamer
         }
     }
 
-    private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema 
schema)
+    private static void InitBuffer<T>(Batch<T> batch, Schema schema)
     {
         var buf = batch.Buffer;
-        WriteBatchHeader(buf, partitionId, schema);
+
+        WriteBatchHeader(buf, batch.PartitionId, schema, 
deletedSetReserveSize: batch.Items.Length);
 
         batch.CountPos = buf.Position;
         buf.Advance(5); // Reserve count.
     }
 
-    private static void WriteBatchHeader(PooledArrayBuffer buf, int 
partitionId, Schema schema)
+    private static void WriteBatchHeader(PooledArrayBuffer buf, int 
partitionId, Schema schema, int deletedSetReserveSize)
     {
         var w = buf.MessageWriter;
+
+        // Reserve space for deleted set - we don't know if we need it or not, 
and which size.
+        w.WriteBitSet(deletedSetReserveSize);
+        buf.Offset = buf.Position;
+
+        // Write header.
         w.Write(schema.TableId);
         w.Write(partitionId);
-        w.WriteNil(); // Deleted rows bit set.
+        w.WriteNil(); // Deleted set. We assume there are no deleted items by 
default. The header will be rewritten if needed.
         w.Write(schema.Version);
     }
 
+    private static void FinalizeBatchHeader<T>(Batch<T> batch)
+    {
+        var buf = batch.Buffer;
+
+        if (HasDeletedItems<T>(batch.Items.AsSpan(0, batch.Count)))
+        {
+            // Re-write the entire header with the deleted set of actual size.
+            var reservedBitSetSize = buf.Offset;
+            var oldPos = buf.Position;
+
+            buf.Position = 0;
+            buf.MessageWriter.WriteBitSet(batch.Count);
+            var actualBitSetSize = buf.Position;
+
+            buf.Offset = 1 + reservedBitSetSize - actualBitSetSize; // 1 byte 
for null bit set used before.
+            buf.Position = buf.Offset;
+
+            var w = buf.MessageWriter;
+            w.Write(batch.Schema.TableId);
+            w.Write(batch.PartitionId);
+
+            var deletedSet = w.WriteBitSet(batch.Count);
+
+            for (var i = 0; i < batch.Count; i++)
+            {
+                if (batch.Items[i].OperationType == 
DataStreamerOperationType.Remove)
+                {
+                    deletedSet.SetBit(i);
+                }
+            }
+
+            w.Write(batch.Schema.Version);
+
+            // Count position should not change - we only rearrange the header 
above it.
+            Debug.Assert(buf.Position == batch.CountPos, $"buf.Position = 
{buf.Position}, batch.CountPos = {batch.CountPos}");
+            buf.Position = oldPos;
+        }
+
+        // Update count.
+        buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
+        buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
+    }
+
     private static void ReWriteBatch<T>(
         PooledArrayBuffer buf,
         int partitionId,
         Schema schema,
-        ReadOnlySpan<T> items,
+        ReadOnlySpan<DataStreamerItem<T>> items,
         RecordSerializer<T> writer)
     {
         buf.Reset();
-        WriteBatchHeader(buf, partitionId, schema);
 
         var w = buf.MessageWriter;
+        w.Write(schema.TableId);
+        w.Write(partitionId);
+
+        if (HasDeletedItems(items))
+        {
+            var deletedSet = w.WriteBitSet(items.Length);
+
+            for (var i = 0; i < items.Length; i++)
+            {
+                if (items[i].OperationType == DataStreamerOperationType.Remove)
+                {
+                    deletedSet.SetBit(i);
+                }
+            }
+        }
+        else
+        {
+            w.WriteNil();
+        }
+
+        w.Write(schema.Version);
         w.Write(items.Length);
 
         foreach (var item in items)
         {
-            writer.Handler.Write(ref w, schema, item, keyOnly: false, 
computeHash: false);
+            var remove = item.OperationType == 
DataStreamerOperationType.Remove;
+            writer.Handler.Write(ref w, schema, item.Data, keyOnly: remove, 
computeHash: false);
         }
     }
 
+    private static bool HasDeletedItems<T>(ReadOnlySpan<DataStreamerItem<T>> 
items)
+    {
+        foreach (var t in items)
+        {
+            if (t.OperationType == DataStreamerOperationType.Remove)
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     private static async Task SendBatchAsync(
         Table table,
         PooledArrayBuffer buf,
@@ -419,18 +508,23 @@ internal static class DataStreamer
         Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
     }
 
+    private static ArrayPool<DataStreamerItem<T>> GetPool<T>() => 
ArrayPool<DataStreamerItem<T>>.Shared;
+
     private sealed record Batch<T>
     {
-        public Batch(int capacity, Schema schema)
+        public Batch(int capacity, Schema schema, int partitionId)
         {
-            Items = ArrayPool<T>.Shared.Rent(capacity);
+            PartitionId = partitionId;
+            Items = GetPool<T>().Rent(capacity);
             Schema = schema;
         }
 
+        public int PartitionId { get; }
+
         public PooledArrayBuffer Buffer { get; set; } = 
ProtoCommon.GetMessageWriter();
 
         [SuppressMessage("Performance", "CA1819:Properties should not return 
arrays", Justification = "Private record")]
-        public T[] Items { get; set; }
+        public DataStreamerItem<T>[] Items { get; set; }
 
         public Schema Schema { get; set; }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 64973f7604..2182b43845 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -158,7 +158,7 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
 
     /// <inheritdoc/>
     public async Task StreamDataAsync(
-        IAsyncEnumerable<KeyValuePair<TK, TV>> data,
+        IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> data,
         DataStreamerOptions? options = null,
         CancellationToken cancellationToken = default) =>
         await _recordView.StreamDataAsync(ToKv(data), options, 
cancellationToken).ConfigureAwait(false);
@@ -190,11 +190,12 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
         return new(key, val);
     }
 
-    private static async IAsyncEnumerable<KvPair<TK, TV>> 
ToKv(IAsyncEnumerable<KeyValuePair<TK, TV>> pairs)
+    private static async IAsyncEnumerable<DataStreamerItem<KvPair<TK, TV>>> 
ToKv(
+        IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> pairs)
     {
         await foreach (var pair in pairs)
         {
-            yield return ToKv(pair);
+            yield return DataStreamerItem.Create(ToKv(pair.Data), 
pair.OperationType);
         }
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 7e545ab151..051baf2ce5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -294,7 +294,7 @@ namespace Apache.Ignite.Internal.Table
 
         /// <inheritdoc/>
         public async Task StreamDataAsync(
-            IAsyncEnumerable<T> data,
+            IAsyncEnumerable<DataStreamerItem<T>> data,
             DataStreamerOptions? options = null,
             CancellationToken cancellationToken = default) =>
             await DataStreamer.StreamDataAsync(
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index f7e95bbd75..46d4df001f 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -196,28 +196,6 @@ namespace Apache.Ignite.Internal.Table.Serialization
             return firstHash;
         }
 
-        /// <summary>
-        /// Write multiple records.
-        /// </summary>
-        /// <param name="buf">Buffer.</param>
-        /// <param name="tx">Transaction.</param>
-        /// <param name="schema">Schema.</param>
-        /// <param name="recs">Records.</param>
-        /// <param name="keyOnly">Key only columns.</param>
-        /// <returns>First record hash.</returns>
-        public int WriteMultiple(
-            PooledArrayBuffer buf,
-            Transactions.Transaction? tx,
-            Schema schema,
-            IEnumerable<T> recs,
-            bool keyOnly = false)
-        {
-            var enumerator = recs.GetEnumerator();
-            enumerator.MoveNext();
-
-            return WriteMultiple(buf, tx, schema, enumerator, keyOnly);
-        }
-
         /// <summary>
         /// Write multiple records.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs
new file mode 100644
index 0000000000..648f04e9bf
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+/// <summary>
+/// Data streamer item.
+/// </summary>
+/// <param name="Data">Data item.</param>
+/// <param name="OperationType">Operation type.</param>
+/// <typeparam name="T">Data type.</typeparam>
+public record struct DataStreamerItem<T>(
+    T Data,
+    DataStreamerOperationType OperationType);
+
+/// <summary>
+/// Creates instances of the <see cref="DataStreamerItem{T}"/> struct.
+/// </summary>
+public static class DataStreamerItem
+{
+    /// <summary>
+    /// Creates a new data streamer item with the <see 
cref="DataStreamerOperationType.Put"/> operation type.
+    /// </summary>
+    /// <param name="data">Data.</param>
+    /// <typeparam name="T">Data type.</typeparam>
+    /// <returns>Data streamer item.</returns>
+    public static DataStreamerItem<T> Create<T>(T data) =>
+        new(data, DataStreamerOperationType.Put);
+
+    /// <summary>
+    /// Creates a new data streamer item instance using provided values.
+    /// </summary>
+    /// <param name="data">Data.</param>
+    /// <param name="operationType">Operation type.</param>
+    /// <typeparam name="T">Data type.</typeparam>
+    /// <returns>Data streamer item.</returns>
+    public static DataStreamerItem<T> Create<T>(T data, 
DataStreamerOperationType operationType) =>
+        new(data, operationType);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
similarity index 68%
copy from modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
copy to 
modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
index 240588d506..71c65c9051 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Tests.Table
-{
-    using System;
-    using System.ComponentModel.DataAnnotations.Schema;
+namespace Apache.Ignite.Table;
 
+/// <summary>
+/// Data streamer operation type.
+/// </summary>
+public enum DataStreamerOperationType
+{
     /// <summary>
-    /// Test user object.
+    /// Put operation.
     /// </summary>
-    public class Poco
-    {
-        public long Key { get; set; }
-
-        public string? Val { get; set; }
+    Put,
 
-        [NotMapped]
-        public Guid UnmappedId { get; set; }
-
-        [NotMapped]
-        public string? UnmappedStr { get; set; }
-    }
+    /// <summary>
+    /// Remove operation.
+    /// </summary>
+    Remove
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index 899be69f20..4d18f65e10 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Table;
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
+using Internal.Common;
 
 /// <summary>
 /// Represents an entity that can be used as a target for streaming data.
@@ -35,5 +36,33 @@ public interface IDataStreamerTarget<T>
     /// <param name="options">Streamer options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-    Task StreamDataAsync(IAsyncEnumerable<T> data, DataStreamerOptions? 
options = null, CancellationToken cancellationToken = default);
+    Task StreamDataAsync(
+        IAsyncEnumerable<DataStreamerItem<T>> data,
+        DataStreamerOptions? options = null,
+        CancellationToken cancellationToken = default);
+
+    /// <summary>
+    /// Streams data into the underlying table.
+    /// </summary>
+    /// <param name="data">Data.</param>
+    /// <param name="options">Streamer options.</param>
+    /// <param name="cancellationToken">Cancellation token.</param>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task StreamDataAsync(
+        IAsyncEnumerable<T> data,
+        DataStreamerOptions? options = null,
+        CancellationToken cancellationToken = default)
+    {
+        IgniteArgumentCheck.NotNull(data);
+
+        return StreamDataAsync(ConvertToItems(), options, cancellationToken);
+
+        async IAsyncEnumerable<DataStreamerItem<T>> ConvertToItems()
+        {
+            await foreach (var item in 
data.WithCancellation(cancellationToken))
+            {
+                yield return DataStreamerItem.Create(item);
+            }
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index a966f8f6e1..af8e4a38af 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -287,6 +288,26 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         assertEquals("bar-99", view.get(null, 
tupleKey(id)).stringValue("name"));
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    @Disabled("IGNITE-21992 Data Streamer removal does not work for a new key 
in the same batch")
+    public void testSameItemInsertUpdateRemove(int pageSize) {
+        RecordView<Tuple> view = defaultTable().recordView();
+        CompletableFuture<Void> streamerFut;
+        int key = 333000;
+
+        try (var publisher = new 
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+            streamerFut = view.streamData(publisher, 
DataStreamerOptions.builder().pageSize(pageSize).build());
+
+            publisher.submit(DataStreamerItem.of(tuple(key, "foo")));
+            publisher.submit(DataStreamerItem.removed(tupleKey(key)));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertNull(view.get(null, tupleKey(key)));
+    }
+
     @SuppressWarnings("resource")
     @Test
     public void testSchemaUpdateWhileStreaming() throws InterruptedException {


Reply via email to