This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9a24cc6145 IGNITE-21490 .NET: Add DataStreamer data removal (#3560)
9a24cc6145 is described below
commit 9a24cc614535c2e668cc0871bec1f1fee5928d29
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Apr 8 11:56:47 2024 +0300
IGNITE-21490 .NET: Add DataStreamer data removal (#3560)
Change `DataStreamer` API to allow data removal:
* `IDataStreamerTarget` now accepts `IAsyncEnumerable<DataStreamerItem<T>>`
* Simplified overload accepts `IAsyncEnumerable<T>` as it was before
* `DataStreamerItem<T>` is a struct record (no allocations), accepts data
item and `DataStreamerOperationType` enum
* `DataStreamerItem` factory class added to simplify generics (similar to
standard library `KeyValuePair.Create`:
* Constructor: `new DataStreamerItem<MyEntityClass<Guid>>(entity)` -
entity type must be specified
* Factory: `DataStreamerItem.Create(entity)` - entity type is inferred
---
.../Apache.Ignite.Tests/ProjectFilesTests.cs | 5 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 97 ++++++++++--
.../dotnet/Apache.Ignite.Tests/Table/Poco.cs | 2 +
.../Table/SchemaSynchronizationTest.cs | 27 +++-
.../dotnet/Apache.Ignite/ClientOperationType.cs | 2 +-
.../Internal/Buffers/PooledArrayBuffer.cs | 34 +++--
.../Apache.Ignite/Internal/Table/DataStreamer.cs | 170 ++++++++++++++++-----
.../Apache.Ignite/Internal/Table/KeyValueView.cs | 7 +-
.../Apache.Ignite/Internal/Table/RecordView.cs | 2 +-
.../Table/Serialization/RecordSerializer.cs | 22 ---
.../dotnet/Apache.Ignite/Table/DataStreamerItem.cs | 53 +++++++
.../Table/DataStreamerOperationType.cs} | 28 ++--
.../Apache.Ignite/Table/IDataStreamerTarget.cs | 31 +++-
.../streamer/ItAbstractDataStreamerTest.java | 21 +++
14 files changed, 391 insertions(+), 110 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
index fa3715a6e3..8c83306089 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
@@ -78,7 +78,10 @@ namespace Apache.Ignite.Tests
if (text.Contains("public class", StringComparison.Ordinal) ||
text.Contains("public record", StringComparison.Ordinal))
{
- Assert.Fail("Public classes must be sealed: " + file);
+ if (!text.Contains("public record struct"))
+ {
+ Assert.Fail("Public classes must be sealed: " + file);
+ }
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 15b9e6aaeb..4c6210ac15 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -28,7 +28,7 @@ using Internal.Proto;
using NUnit.Framework;
/// <summary>
-/// Tests for <see cref="IDataStreamerTarget{T}.StreamDataAsync"/>.
+/// Tests for <see cref="IDataStreamerTarget{T}"/>.
/// <para />
/// See DataStreamer partition awareness tests in <see
cref="PartitionAwarenessTests"/>.
/// </summary>
@@ -36,25 +36,48 @@ public class DataStreamerTests : IgniteTestsBase
{
private const int Count = 100;
+ private const int UpdatedKey = Count / 2;
+
+ private const int DeletedKey = Count + 1;
+
+ private static int _unknownKey = 333000;
+
[SetUp]
- public async Task DeleteAll() =>
- await TupleView.DeleteAllAsync(null, Enumerable.Range(0,
Count).Select(x => GetTuple(x)));
+ public async Task PrepareData()
+ {
+ await TupleView.UpsertAsync(null, GetTuple(UpdatedKey, "update me"));
+ await TupleView.UpsertAsync(null, GetTuple(DeletedKey, "delete me"));
+ }
+
+ [TearDown]
+ public async Task DeleteAll() => await Client.Sql.ExecuteAsync(null,
$"DELETE FROM {TableName}");
[Test]
public async Task TestBasicStreamingRecordBinaryView()
{
- var options = DataStreamerOptions.Default with { PageSize = 10 };
- var data = Enumerable.Range(0, Count).Select(x => GetTuple(x, "t" +
x)).ToList();
-
- await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
+ await TupleView.StreamDataAsync(GetData(), DataStreamerOptions.Default
with { PageSize = 10 });
await CheckData();
+
+ static async IAsyncEnumerable<DataStreamerItem<IIgniteTuple>> GetData()
+ {
+ for (int i = 0; i < Count; i++)
+ {
+ yield return DataStreamerItem.Create(GetTuple(i, "t" + i));
+ }
+
+ await Task.Yield();
+ yield return DataStreamerItem.Create(GetTuple(DeletedKey),
DataStreamerOperationType.Remove);
+ }
}
[Test]
public async Task TestBasicStreamingRecordView()
{
var options = DataStreamerOptions.Default with { PageSize = 5 };
- var data = Enumerable.Range(0, Count).Select(x => GetPoco(x, "t" +
x)).ToList();
+ var data = Enumerable.Range(0, Count)
+ .Select(x => DataStreamerItem.Create(GetPoco(x, "t" + x)))
+ .Concat(new[] { DataStreamerItem.Create(GetPoco(DeletedKey),
DataStreamerOperationType.Remove) })
+ .ToList();
await
Table.GetRecordView<Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
await CheckData();
@@ -65,7 +88,8 @@ public class DataStreamerTests : IgniteTestsBase
{
var options = DataStreamerOptions.Default with { PageSize = 10_000 };
var data = Enumerable.Range(0, Count)
- .Select(x => new KeyValuePair<IIgniteTuple,
IIgniteTuple>(GetTuple(x), GetTuple("t" + x)))
+ .Select(x =>
DataStreamerItem.Create(KeyValuePair.Create(GetTuple(x), GetTuple("t" + x))))
+ .Concat(new[] {
DataStreamerItem.Create(KeyValuePair.Create(GetTuple(DeletedKey),
default(IIgniteTuple)!), DataStreamerOperationType.Remove) })
.ToList();
await
Table.KeyValueBinaryView.StreamDataAsync(data.ToAsyncEnumerable(), options);
@@ -77,7 +101,8 @@ public class DataStreamerTests : IgniteTestsBase
{
var options = DataStreamerOptions.Default with { PageSize = 1 };
var data = Enumerable.Range(0, Count)
- .Select(x => new KeyValuePair<long, Poco>(x, GetPoco(x, "t" + x)))
+ .Select(x => DataStreamerItem.Create(KeyValuePair.Create((long)x,
GetPoco(x, "t" + x))))
+ .Concat(new[] {
DataStreamerItem.Create(KeyValuePair.Create((long)DeletedKey, default(Poco)!),
DataStreamerOperationType.Remove) })
.ToList();
await Table.GetKeyValueView<long,
Poco>().StreamDataAsync(data.ToAsyncEnumerable(), options);
@@ -182,6 +207,55 @@ public class DataStreamerTests : IgniteTestsBase
Assert.That(server.DroppedConnectionCount,
Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.PageSize));
}
+ [Test]
+ public async Task TestAddUpdateRemoveMixed(
+ [Values(1, 2, 100)] int pageSize,
+ [Values(true, false)] bool existingMinKey)
+ {
+ if (pageSize > 1)
+ {
+ // TODO: IGNITE-21992 Data Streamer removal does not work for a
new key in the same batch
+ return;
+ }
+
+ var minKey = existingMinKey ? UpdatedKey : Interlocked.Add(ref
_unknownKey, 10);
+ await Table.GetRecordView<Poco>().StreamDataAsync(
+ GetData(),
+ DataStreamerOptions.Default with { PageSize = pageSize });
+
+ IList<Option<Poco>> res = await PocoView.GetAllAsync(null,
Enumerable.Range(minKey, 4).Select(x => GetPoco(x)));
+ Assert.AreEqual(4, res.Count);
+
+ Assert.IsFalse(res[0].HasValue, "Deleted key should not exist: " +
res[0]);
+
+ Assert.IsTrue(res[1].HasValue);
+ Assert.AreEqual("created2", res[1].Value.Val);
+
+ Assert.IsTrue(res[2].HasValue);
+ Assert.AreEqual("updated", res[2].Value.Val);
+
+ Assert.IsTrue(res[3].HasValue);
+ Assert.AreEqual("created", res[3].Value.Val);
+
+ async IAsyncEnumerable<DataStreamerItem<Poco>> GetData()
+ {
+ await Task.Yield();
+ yield return DataStreamerItem.Create(GetPoco(minKey, "created"));
+ yield return DataStreamerItem.Create(GetPoco(minKey, "updated"));
+ yield return DataStreamerItem.Create(GetPoco(minKey, "deleted"),
DataStreamerOperationType.Remove);
+
+ yield return DataStreamerItem.Create(GetPoco(minKey + 1,
"created"));
+ yield return DataStreamerItem.Create(GetPoco(minKey + 1,
"updated"));
+ yield return DataStreamerItem.Create(GetPoco(minKey + 1,
"deleted"), DataStreamerOperationType.Remove);
+ yield return DataStreamerItem.Create(GetPoco(minKey + 1,
"created2"));
+
+ yield return DataStreamerItem.Create(GetPoco(minKey + 2,
"created"));
+ yield return DataStreamerItem.Create(GetPoco(minKey + 2,
"updated"));
+
+ yield return DataStreamerItem.Create(GetPoco(minKey + 3,
"created"));
+ }
+ }
+
private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int
count)
{
for (var i = 0; i < count; i++)
@@ -211,5 +285,8 @@ public class DataStreamerTests : IgniteTestsBase
{
Assert.IsTrue(hasVal);
}
+
+ var deletedExists = await TupleView.ContainsKeyAsync(null,
GetTuple(DeletedKey));
+ Assert.IsFalse(deletedExists);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
index 240588d506..a26d94150b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
@@ -34,5 +34,7 @@ namespace Apache.Ignite.Tests.Table
[NotMapped]
public string? UnmappedStr { get; set; }
+
+ public override string ToString() => $"Poco [Key={Key}, Val={Val}]";
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 22ac2370d2..7bc9cb63c4 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -45,7 +45,8 @@ public class SchemaSynchronizationTest : IgniteTestsBase
}
private static string TestTableName => TestContext.CurrentContext.Test.Name
- .Replace("(", string.Empty)
+ .Replace("(", "_")
+ .Replace(",", "_")
.Replace(")", string.Empty);
[TearDown]
@@ -334,7 +335,9 @@ public class SchemaSynchronizationTest : IgniteTestsBase
[Test]
[SuppressMessage("ReSharper", "AccessToDisposedClosure", Justification =
"Reviewed")]
- public async Task TestSchemaUpdateWhileStreaming([Values(true, false)]
bool insertNewColumn)
+ public async Task TestSchemaUpdateWhileStreaming(
+ [Values(true, false)] bool insertNewColumn,
+ [Values(true, false)] bool withRemove)
{
using var metricListener = new MetricsTests.Listener();
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName}
(KEY bigint PRIMARY KEY)");
@@ -353,12 +356,17 @@ public class SchemaSynchronizationTest : IgniteTestsBase
var res2 = await view.GetAsync(null, GetTuple(19));
Assert.AreEqual(insertNewColumn ? "BAR_19" : "FOO", res2.Value["VAL"]);
- async IAsyncEnumerable<IIgniteTuple> GetData()
+ async IAsyncEnumerable<DataStreamerItem<IIgniteTuple>> GetData()
{
// First set of batches uses old schema.
for (int i = 0; i < 20; i++)
{
- yield return GetTuple(i);
+ if (withRemove)
+ {
+ yield return DataStreamerItem.Create(GetTuple(-i),
DataStreamerOperationType.Remove);
+ }
+
+ yield return DataStreamerItem.Create(GetTuple(i));
}
// Wait for background streaming to complete.
@@ -369,9 +377,16 @@ public class SchemaSynchronizationTest : IgniteTestsBase
// New schema has a new column with a default value, so it is not
required to provide it in the streamed data.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName}
ADD COLUMN VAL varchar DEFAULT 'FOO'");
- for (int i = 10; i < 20; i++)
+ for (int i = 10; i < 30; i++)
{
- yield return insertNewColumn ? GetTuple(i, "BAR_" + i) :
GetTuple(i);
+ if (withRemove)
+ {
+ yield return DataStreamerItem.Create(GetTuple(-i),
DataStreamerOperationType.Remove);
+ }
+
+ yield return insertNewColumn
+ ? DataStreamerItem.Create(GetTuple(i, "BAR_" + i))
+ : DataStreamerItem.Create(GetTuple(i));
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 0a1d3ee484..44c034e1f8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -147,7 +147,7 @@ namespace Apache.Ignite
ComputeChangePriority,
/// <summary>
- /// Send data streamer batch (<see
cref="IDataStreamerTarget{T}.StreamDataAsync"/>).
+ /// Send data streamer batch (<see cref="IDataStreamerTarget{T}"/>).
/// </summary>
StreamerBatchSend
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 86cf4fd1a6..aaef2724f9 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -60,9 +60,24 @@ namespace Apache.Ignite.Internal.Buffers
public MsgPackWriter MessageWriter => new(this);
/// <summary>
- /// Gets the current position.
+ /// Gets or sets the current position.
/// </summary>
- public int Position => _index - _prefixSize;
+ public int Position
+ {
+ get => _index - _prefixSize;
+ set
+ {
+ Debug.Assert(value >= 0, "value >= 0");
+ Debug.Assert(value <= _buffer.Length - _prefixSize, "value <=
_buffer.Length - _prefixSize");
+
+ _index = value + _prefixSize;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the data offset from the start of the buffer.
+ /// </summary>
+ public int Offset { get; set; }
/// <summary>
/// Gets the free capacity.
@@ -77,7 +92,7 @@ namespace Apache.Ignite.Internal.Buffers
{
Debug.Assert(!_disposed, "!_disposed");
- return new(_buffer, start: 0, length: _index);
+ return new(_buffer, start: Offset, length: _index);
}
/// <summary>
@@ -95,7 +110,11 @@ namespace Apache.Ignite.Internal.Buffers
/// <summary>
/// Resets the buffer to the initial state.
/// </summary>
- public void Reset() => _index = _prefixSize;
+ public void Reset()
+ {
+ _index = _prefixSize;
+ Offset = 0;
+ }
/// <summary>
/// Gets a span for writing.
@@ -210,13 +229,6 @@ namespace Apache.Ignite.Internal.Buffers
_index += 8;
}
- /// <summary>
- /// Reads a byte at specified position.
- /// </summary>
- /// <param name="pos">Position.</param>
- /// <returns>Result.</returns>
- public byte ReadByte(int pos) => _buffer[pos + _prefixSize];
-
/// <summary>
/// Reads a short at specified position.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
index 59655b0533..52d5402292 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamer.cs
@@ -22,6 +22,7 @@ using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
+using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -60,7 +61,7 @@ internal static class DataStreamer
/// <typeparam name="T">Element type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
internal static async Task StreamDataAsync<T>(
- IAsyncEnumerable<T> data,
+ IAsyncEnumerable<DataStreamerItem<T>> data,
Table table,
RecordSerializer<T> writer,
DataStreamerOptions options,
@@ -107,10 +108,10 @@ internal static class DataStreamer
// However, not all producers support cancellation, so we need
to check it here as well.
cancellationToken.ThrowIfCancellationRequested();
- var (batch, partition) = await
AddWithRetryUnmapped(item).ConfigureAwait(false);
+ var batch = await
AddWithRetryUnmapped(item).ConfigureAwait(false);
if (batch.Count >= options.PageSize)
{
- await SendAsync(batch, partition).ConfigureAwait(false);
+ await SendAsync(batch).ConfigureAwait(false);
}
if (lastPartitionsAssignmentCheck.Elapsed >
PartitionAssignmentUpdateFrequency)
@@ -136,7 +137,7 @@ internal static class DataStreamer
foreach (var batch in batches.Values)
{
batch.Buffer.Dispose();
- ArrayPool<T>.Shared.Return(batch.Items);
+ GetPool<T>().Return(batch.Items);
Metrics.StreamerItemsQueuedDecrement(batch.Count);
Metrics.StreamerBatchesActiveDecrement();
@@ -145,7 +146,7 @@ internal static class DataStreamer
return;
- async ValueTask<(Batch<T> Batch, int Partition)>
AddWithRetryUnmapped(T item)
+ async ValueTask<Batch<T>> AddWithRetryUnmapped(DataStreamerItem<T>
item)
{
try
{
@@ -158,10 +159,15 @@ internal static class DataStreamer
}
}
- (Batch<T> Batch, int Partition) Add(T item)
+ Batch<T> Add(DataStreamerItem<T> item)
{
var schema0 = schema;
- var tupleBuilder = new BinaryTupleBuilder(schema0.Columns.Length,
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
+
+ var columnCount = item.OperationType ==
DataStreamerOperationType.Remove
+ ? schema0.KeyColumns.Length
+ : schema0.Columns.Length;
+
+ var tupleBuilder = new BinaryTupleBuilder(columnCount,
hashedColumnsPredicate: schema0.HashedColumnIndexProvider);
try
{
@@ -173,7 +179,7 @@ internal static class DataStreamer
}
}
- (Batch<T> Batch, int Partition) Add0(T item, ref BinaryTupleBuilder
tupleBuilder, Schema schema0)
+ Batch<T> Add0(DataStreamerItem<T> item, ref BinaryTupleBuilder
tupleBuilder, Schema schema0)
{
var columnCount = schema0.Columns.Length;
@@ -182,7 +188,8 @@ internal static class DataStreamer
Span<byte> noValueSet = stackalloc byte[columnCount / 8 + 1];
Span<byte> noValueSetRef = MemoryMarshal.CreateSpan(ref
MemoryMarshal.GetReference(noValueSet), columnCount);
- writer.Handler.Write(ref tupleBuilder, item, schema0, keyOnly:
false, noValueSetRef);
+ var keyOnly = item.OperationType ==
DataStreamerOperationType.Remove;
+ writer.Handler.Write(ref tupleBuilder, item.Data, schema0,
keyOnly: keyOnly, noValueSetRef);
var partitionId = Math.Abs(tupleBuilder.GetHash() %
partitionCount);
var batch = GetOrCreateBatch(partitionId);
@@ -210,7 +217,7 @@ internal static class DataStreamer
Metrics.StreamerItemsQueuedIncrement();
- return (batch, partitionId);
+ return batch;
}
Batch<T> GetOrCreateBatch(int partitionId)
@@ -219,8 +226,8 @@ internal static class DataStreamer
if (batchRef == null)
{
- batchRef = new Batch<T>(options.PageSize, schema);
- InitBuffer(batchRef, partitionId, schema);
+ batchRef = new Batch<T>(options.PageSize, schema, partitionId);
+ InitBuffer(batchRef, schema);
Metrics.StreamerBatchesActiveIncrement();
}
@@ -228,7 +235,7 @@ internal static class DataStreamer
return batchRef;
}
- async Task SendAsync(Batch<T> batch, int partitionId)
+ async Task SendAsync(Batch<T> batch)
{
var expectedSize = batch.Count;
@@ -243,18 +250,14 @@ internal static class DataStreamer
return;
}
- var buf = batch.Buffer;
-
- // See RecordSerializer.WriteMultiple.
- buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
- buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
-
- batch.Task = SendAndDisposeBufAsync(buf, partitionId,
batch.Task, batch.Items, batch.Count, batch.SchemaOutdated);
+ FinalizeBatchHeader(batch);
+ batch.Task = SendAndDisposeBufAsync(
+ batch.Buffer, batch.PartitionId, batch.Task, batch.Items,
batch.Count, batch.SchemaOutdated);
- batch.Items = ArrayPool<T>.Shared.Rent(options.PageSize);
+ batch.Items = GetPool<T>().Rent(options.PageSize);
batch.Count = 0;
batch.Buffer = ProtoCommon.GetMessageWriter(); // Prev buf
will be disposed in SendAndDisposeBufAsync.
- InitBuffer(batch, partitionId, schema);
+ InitBuffer(batch, schema);
batch.LastFlush = Stopwatch.GetTimestamp();
batch.Schema = schema;
batch.SchemaOutdated = false;
@@ -267,12 +270,14 @@ internal static class DataStreamer
PooledArrayBuffer buf,
int partitionId,
Task oldTask,
- T[] items,
+ DataStreamerItem<T>[] items,
int count,
bool batchSchemaOutdated)
{
Debug.Assert(items.Length > 0, "items.Length > 0");
+ Console.WriteLine("Sending batch: " +
items.Take(count).StringJoin());
+
if (batchSchemaOutdated)
{
// Schema update was detected while the batch was being filled.
@@ -323,7 +328,7 @@ internal static class DataStreamer
finally
{
buf.Dispose();
- ArrayPool<T>.Shared.Return(items);
+ GetPool<T>().Return(items);
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
@@ -337,11 +342,11 @@ internal static class DataStreamer
await Task.Delay(options.AutoFlushFrequency,
flushCt).ConfigureAwait(false);
var ts = Stopwatch.GetTimestamp();
- foreach (var (partition, batch) in batches)
+ foreach (var batch in batches.Values)
{
if (batch.Count > 0 && ts - batch.LastFlush >
options.AutoFlushFrequency.Ticks)
{
- await SendAsync(batch,
partition).ConfigureAwait(false);
+ await SendAsync(batch).ConfigureAwait(false);
}
}
}
@@ -349,11 +354,11 @@ internal static class DataStreamer
async Task Drain()
{
- foreach (var (partition, batch) in batches)
+ foreach (var batch in batches.Values)
{
if (batch.Count > 0)
{
- await SendAsync(batch, partition).ConfigureAwait(false);
+ await SendAsync(batch).ConfigureAwait(false);
}
await batch.Task.ConfigureAwait(false);
@@ -361,43 +366,127 @@ internal static class DataStreamer
}
}
- private static void InitBuffer<T>(Batch<T> batch, int partitionId, Schema
schema)
+ private static void InitBuffer<T>(Batch<T> batch, Schema schema)
{
var buf = batch.Buffer;
- WriteBatchHeader(buf, partitionId, schema);
+
+ WriteBatchHeader(buf, batch.PartitionId, schema,
deletedSetReserveSize: batch.Items.Length);
batch.CountPos = buf.Position;
buf.Advance(5); // Reserve count.
}
- private static void WriteBatchHeader(PooledArrayBuffer buf, int
partitionId, Schema schema)
+ private static void WriteBatchHeader(PooledArrayBuffer buf, int
partitionId, Schema schema, int deletedSetReserveSize)
{
var w = buf.MessageWriter;
+
+ // Reserve space for deleted set - we don't know if we need it or not,
and which size.
+ w.WriteBitSet(deletedSetReserveSize);
+ buf.Offset = buf.Position;
+
+ // Write header.
w.Write(schema.TableId);
w.Write(partitionId);
- w.WriteNil(); // Deleted rows bit set.
+ w.WriteNil(); // Deleted set. We assume there are no deleted items by
default. The header will be rewritten if needed.
w.Write(schema.Version);
}
+ private static void FinalizeBatchHeader<T>(Batch<T> batch)
+ {
+ var buf = batch.Buffer;
+
+ if (HasDeletedItems<T>(batch.Items.AsSpan(0, batch.Count)))
+ {
+ // Re-write the entire header with the deleted set of actual size.
+ var reservedBitSetSize = buf.Offset;
+ var oldPos = buf.Position;
+
+ buf.Position = 0;
+ buf.MessageWriter.WriteBitSet(batch.Count);
+ var actualBitSetSize = buf.Position;
+
+ buf.Offset = 1 + reservedBitSetSize - actualBitSetSize; // 1 byte
for null bit set used before.
+ buf.Position = buf.Offset;
+
+ var w = buf.MessageWriter;
+ w.Write(batch.Schema.TableId);
+ w.Write(batch.PartitionId);
+
+ var deletedSet = w.WriteBitSet(batch.Count);
+
+ for (var i = 0; i < batch.Count; i++)
+ {
+ if (batch.Items[i].OperationType ==
DataStreamerOperationType.Remove)
+ {
+ deletedSet.SetBit(i);
+ }
+ }
+
+ w.Write(batch.Schema.Version);
+
+ // Count position should not change - we only rearrange the header
above it.
+ Debug.Assert(buf.Position == batch.CountPos, $"buf.Position =
{buf.Position}, batch.CountPos = {batch.CountPos}");
+ buf.Position = oldPos;
+ }
+
+ // Update count.
+ buf.WriteByte(MsgPackCode.Int32, batch.CountPos);
+ buf.WriteIntBigEndian(batch.Count, batch.CountPos + 1);
+ }
+
private static void ReWriteBatch<T>(
PooledArrayBuffer buf,
int partitionId,
Schema schema,
- ReadOnlySpan<T> items,
+ ReadOnlySpan<DataStreamerItem<T>> items,
RecordSerializer<T> writer)
{
buf.Reset();
- WriteBatchHeader(buf, partitionId, schema);
var w = buf.MessageWriter;
+ w.Write(schema.TableId);
+ w.Write(partitionId);
+
+ if (HasDeletedItems(items))
+ {
+ var deletedSet = w.WriteBitSet(items.Length);
+
+ for (var i = 0; i < items.Length; i++)
+ {
+ if (items[i].OperationType == DataStreamerOperationType.Remove)
+ {
+ deletedSet.SetBit(i);
+ }
+ }
+ }
+ else
+ {
+ w.WriteNil();
+ }
+
+ w.Write(schema.Version);
w.Write(items.Length);
foreach (var item in items)
{
- writer.Handler.Write(ref w, schema, item, keyOnly: false,
computeHash: false);
+ var remove = item.OperationType ==
DataStreamerOperationType.Remove;
+ writer.Handler.Write(ref w, schema, item.Data, keyOnly: remove,
computeHash: false);
}
}
+ private static bool HasDeletedItems<T>(ReadOnlySpan<DataStreamerItem<T>>
items)
+ {
+ foreach (var t in items)
+ {
+ if (t.OperationType == DataStreamerOperationType.Remove)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private static async Task SendBatchAsync(
Table table,
PooledArrayBuffer buf,
@@ -419,18 +508,23 @@ internal static class DataStreamer
Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
}
+ private static ArrayPool<DataStreamerItem<T>> GetPool<T>() =>
ArrayPool<DataStreamerItem<T>>.Shared;
+
private sealed record Batch<T>
{
- public Batch(int capacity, Schema schema)
+ public Batch(int capacity, Schema schema, int partitionId)
{
- Items = ArrayPool<T>.Shared.Rent(capacity);
+ PartitionId = partitionId;
+ Items = GetPool<T>().Rent(capacity);
Schema = schema;
}
+ public int PartitionId { get; }
+
public PooledArrayBuffer Buffer { get; set; } =
ProtoCommon.GetMessageWriter();
[SuppressMessage("Performance", "CA1819:Properties should not return
arrays", Justification = "Private record")]
- public T[] Items { get; set; }
+ public DataStreamerItem<T>[] Items { get; set; }
public Schema Schema { get; set; }
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 64973f7604..2182b43845 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -158,7 +158,7 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
/// <inheritdoc/>
public async Task StreamDataAsync(
- IAsyncEnumerable<KeyValuePair<TK, TV>> data,
+ IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> data,
DataStreamerOptions? options = null,
CancellationToken cancellationToken = default) =>
await _recordView.StreamDataAsync(ToKv(data), options,
cancellationToken).ConfigureAwait(false);
@@ -190,11 +190,12 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
return new(key, val);
}
- private static async IAsyncEnumerable<KvPair<TK, TV>>
ToKv(IAsyncEnumerable<KeyValuePair<TK, TV>> pairs)
+ private static async IAsyncEnumerable<DataStreamerItem<KvPair<TK, TV>>>
ToKv(
+ IAsyncEnumerable<DataStreamerItem<KeyValuePair<TK, TV>>> pairs)
{
await foreach (var pair in pairs)
{
- yield return ToKv(pair);
+ yield return DataStreamerItem.Create(ToKv(pair.Data),
pair.OperationType);
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 7e545ab151..051baf2ce5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -294,7 +294,7 @@ namespace Apache.Ignite.Internal.Table
/// <inheritdoc/>
public async Task StreamDataAsync(
- IAsyncEnumerable<T> data,
+ IAsyncEnumerable<DataStreamerItem<T>> data,
DataStreamerOptions? options = null,
CancellationToken cancellationToken = default) =>
await DataStreamer.StreamDataAsync(
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index f7e95bbd75..46d4df001f 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -196,28 +196,6 @@ namespace Apache.Ignite.Internal.Table.Serialization
return firstHash;
}
- /// <summary>
- /// Write multiple records.
- /// </summary>
- /// <param name="buf">Buffer.</param>
- /// <param name="tx">Transaction.</param>
- /// <param name="schema">Schema.</param>
- /// <param name="recs">Records.</param>
- /// <param name="keyOnly">Key only columns.</param>
- /// <returns>First record hash.</returns>
- public int WriteMultiple(
- PooledArrayBuffer buf,
- Transactions.Transaction? tx,
- Schema schema,
- IEnumerable<T> recs,
- bool keyOnly = false)
- {
- var enumerator = recs.GetEnumerator();
- enumerator.MoveNext();
-
- return WriteMultiple(buf, tx, schema, enumerator, keyOnly);
- }
-
/// <summary>
/// Write multiple records.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs
new file mode 100644
index 0000000000..648f04e9bf
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerItem.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+/// <summary>
+/// Data streamer item.
+/// </summary>
+/// <param name="Data">Data item.</param>
+/// <param name="OperationType">Operation type.</param>
+/// <typeparam name="T">Data type.</typeparam>
+public record struct DataStreamerItem<T>(
+ T Data,
+ DataStreamerOperationType OperationType);
+
+/// <summary>
+/// Creates instances of the <see cref="DataStreamerItem{T}"/> struct.
+/// </summary>
+public static class DataStreamerItem
+{
+ /// <summary>
+ /// Creates a new data streamer item with the <see
cref="DataStreamerOperationType.Put"/> operation type.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <typeparam name="T">Data type.</typeparam>
+ /// <returns>Data streamer item.</returns>
+ public static DataStreamerItem<T> Create<T>(T data) =>
+ new(data, DataStreamerOperationType.Put);
+
+ /// <summary>
+ /// Creates a new data streamer item instance using provided values.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <param name="operationType">Operation type.</param>
+ /// <typeparam name="T">Data type.</typeparam>
+ /// <returns>Data streamer item.</returns>
+ public static DataStreamerItem<T> Create<T>(T data,
DataStreamerOperationType operationType) =>
+ new(data, operationType);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
similarity index 68%
copy from modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
copy to
modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
index 240588d506..71c65c9051 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Poco.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/DataStreamerOperationType.cs
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Tests.Table
-{
- using System;
- using System.ComponentModel.DataAnnotations.Schema;
+namespace Apache.Ignite.Table;
+/// <summary>
+/// Data streamer operation type.
+/// </summary>
+public enum DataStreamerOperationType
+{
/// <summary>
- /// Test user object.
+ /// Put operation.
/// </summary>
- public class Poco
- {
- public long Key { get; set; }
-
- public string? Val { get; set; }
+ Put,
- [NotMapped]
- public Guid UnmappedId { get; set; }
-
- [NotMapped]
- public string? UnmappedStr { get; set; }
- }
+ /// <summary>
+ /// Remove operation.
+ /// </summary>
+ Remove
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index 899be69f20..4d18f65e10 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Table;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
+using Internal.Common;
/// <summary>
/// Represents an entity that can be used as a target for streaming data.
@@ -35,5 +36,33 @@ public interface IDataStreamerTarget<T>
/// <param name="options">Streamer options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task StreamDataAsync(IAsyncEnumerable<T> data, DataStreamerOptions?
options = null, CancellationToken cancellationToken = default);
+ Task StreamDataAsync(
+ IAsyncEnumerable<DataStreamerItem<T>> data,
+ DataStreamerOptions? options = null,
+ CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Streams data into the underlying table.
+ /// </summary>
+ /// <param name="data">Data.</param>
+ /// <param name="options">Streamer options.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task StreamDataAsync(
+ IAsyncEnumerable<T> data,
+ DataStreamerOptions? options = null,
+ CancellationToken cancellationToken = default)
+ {
+ IgniteArgumentCheck.NotNull(data);
+
+ return StreamDataAsync(ConvertToItems(), options, cancellationToken);
+
+ async IAsyncEnumerable<DataStreamerItem<T>> ConvertToItems()
+ {
+ await foreach (var item in
data.WithCancellation(cancellationToken))
+ {
+ yield return DataStreamerItem.Create(item);
+ }
+ }
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index a966f8f6e1..af8e4a38af 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -287,6 +288,26 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
assertEquals("bar-99", view.get(null,
tupleKey(id)).stringValue("name"));
}
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3})
+ @Disabled("IGNITE-21992 Data Streamer removal does not work for a new key
in the same batch")
+ public void testSameItemInsertUpdateRemove(int pageSize) {
+ RecordView<Tuple> view = defaultTable().recordView();
+ CompletableFuture<Void> streamerFut;
+ int key = 333000;
+
+ try (var publisher = new
SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+ streamerFut = view.streamData(publisher,
DataStreamerOptions.builder().pageSize(pageSize).build());
+
+ publisher.submit(DataStreamerItem.of(tuple(key, "foo")));
+ publisher.submit(DataStreamerItem.removed(tupleKey(key)));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertNull(view.get(null, tupleKey(key)));
+ }
+
@SuppressWarnings("resource")
@Test
public void testSchemaUpdateWhileStreaming() throws InterruptedException {