This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9a467ff103 IGNITE-22356 .NET: Add results support to Data Streamer
with receiver (#3935)
9a467ff103 is described below
commit 9a467ff103743ec1f78dfe7aa34201de781ef41e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jun 17 17:19:58 2024 +0300
IGNITE-22356 .NET: Add results support to Data Streamer with receiver
(#3935)
* Implement receiver results deserialization
* Pass the results to the user as `IAsyncEnumerable<TResult>`
* Resulting `IAsyncEnumerable` applies back-pressure to the streamer (we
don't want to buffer infinite number of results), so the user has to consume
the results to complete the streaming
* The user can choose to consume the results partially by disposing the
enumerator (or ending the `await foreach` loop). We still complete the
streaming in this case, but discard the remaining results
* If the resulting `IAsyncEnumerable` is cancelled, the streamer is
cancelled too
---
.../Proto/BinaryTuple/BinaryTupleTests.cs | 8 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 261 ++++++++++++++++++++-
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 15 ++
.../Proto/BinaryTuple/BinaryTupleReader.cs | 37 +++
.../Internal/Table/DataStreamerWithReceiver.cs | 77 ++++--
.../Apache.Ignite/Internal/Table/RecordView.cs | 108 +++++++--
.../Apache.Ignite/Table/IDataStreamerTarget.cs | 7 +-
.../runner/app/PlatformTestNodeRunner.java | 24 +-
8 files changed, 487 insertions(+), 50 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
index 02e5691722..9b93e66287 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
@@ -839,8 +839,11 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple
b.AppendObjectWithType(date);
b.AppendObjectWithType(dateTime);
b.AppendObjectWithType(Instant.FromDateTimeUtc(utcNow));
+ b.AppendObjectWithType(true);
+ b.AppendObjectWithType(Period.FromDays(2));
+ b.AppendObjectWithType(Duration.FromDays(3));
},
- 17 * 3);
+ 20 * 3);
Assert.IsNull(reader.GetObject(0));
Assert.AreEqual(sbyte.MaxValue, reader.GetObject(3));
@@ -859,6 +862,9 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple
Assert.AreEqual(date, reader.GetObject(42));
Assert.AreEqual(dateTime, reader.GetObject(45));
Assert.AreEqual(Instant.FromDateTimeUtc(utcNow),
reader.GetObject(48));
+ Assert.IsTrue((bool)reader.GetObject(51)!);
+ Assert.AreEqual(Period.FromDays(2), reader.GetObject(54));
+ Assert.AreEqual(Duration.FromDays(3), reader.GetObject(57));
}
[Test]
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 9db142969c..7b7d73f958 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -22,6 +22,7 @@ using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Linq;
+using System.Numerics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -42,6 +43,8 @@ public class DataStreamerTests : IgniteTestsBase
{
private const string TestReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$TestReceiver";
+ private const string EchoArgsReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$EchoArgsReceiver";
+
private const string UpsertElementTypeNameReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
private const int Count = 100;
@@ -50,6 +53,29 @@ public class DataStreamerTests : IgniteTestsBase
private const int DeletedKey = Count + 1;
+ private static readonly object[] AllSupportedTypes =
+ {
+ true,
+ sbyte.MaxValue,
+ short.MinValue,
+ int.MaxValue,
+ long.MinValue,
+ float.MaxValue,
+ double.MinValue,
+ decimal.One,
+ new LocalDate(1234, 5, 6),
+ new LocalTime(12, 3, 4, 567),
+ new LocalDateTime(1234, 5, 6, 7, 8, 9),
+ Instant.FromUnixTimeSeconds(123456),
+ Guid.Empty,
+ new BitArray(new[] { byte.MaxValue }),
+ "str123",
+ new byte[] { 1, 2, 3 },
+ Period.FromDays(999),
+ Duration.FromSeconds(12345),
+ new BigInteger(12.34)
+ };
+
private static int _unknownKey = 333000;
[SetUp]
@@ -179,7 +205,7 @@ public class DataStreamerTests : IgniteTestsBase
}
[Test]
- public void TestOptionsValidation()
+ public void TestOptionsValidation([Values(true, false, null)] bool?
withReceiverResults)
{
AssertException(DataStreamerOptions.Default with { PageSize = -10 },
"PageSize should be positive.");
AssertException(DataStreamerOptions.Default with { RetryLimit = -1 },
"RetryLimit should be non-negative.");
@@ -190,7 +216,42 @@ public class DataStreamerTests : IgniteTestsBase
void AssertException(DataStreamerOptions options, string message)
{
var ex = Assert.ThrowsAsync<ArgumentException>(
- async () => await
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
options));
+ async () =>
+ {
+ switch (withReceiverResults)
+ {
+ // No receiver.
+ case null:
+ await
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
options);
+ break;
+
+ // Receiver without results.
+ case false:
+ await
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string>(
+
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
+ t => t,
+ t => t.ToString()!,
+ Array.Empty<DeploymentUnit>(),
+ TestReceiverClassName,
+ null,
+ options);
+
+ break;
+
+ // Receiver with results.
+ case true:
+ await
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string, string>(
+
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
+ t => t,
+ t => t.ToString()!,
+ Array.Empty<DeploymentUnit>(),
+ TestReceiverClassName,
+ null,
+ options).ToListAsync();
+
+ break;
+ }
+ });
StringAssert.Contains(message, ex?.Message);
}
@@ -332,6 +393,35 @@ public class DataStreamerTests : IgniteTestsBase
}
}
+ [Test]
+ public async Task TestWithReceiverWithResultsRecordBinaryView()
+ {
+ IAsyncEnumerable<string> results = TupleView.StreamDataAsync<int,
string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => GetTuple(x),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg1", 22 },
+ options: DataStreamerOptions.Default);
+
+ var resultSet = await results.ToHashSetAsync();
+
+ for (int i = 0; i < Count; i++)
+ {
+ var res = await TupleView.GetAsync(null, GetTuple(i));
+
+ var expectedVal = $"value{i * 10}_arg1_22";
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+ CollectionAssert.Contains(resultSet, expectedVal);
+ }
+
+ Assert.AreEqual(Count, resultSet.Count);
+ }
+
[Test]
public async Task TestWithReceiverRecordView()
{
@@ -353,6 +443,35 @@ public class DataStreamerTests : IgniteTestsBase
}
}
+ [Test]
+ public async Task TestWithReceiverResultsRecordView()
+ {
+ IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int,
string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => GetPoco(x),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg1", 22 },
+ options: DataStreamerOptions.Default);
+
+ var resultSet = await results.ToHashSetAsync();
+
+ for (int i = 0; i < Count; i++)
+ {
+ var res = await TupleView.GetAsync(null, GetTuple(i));
+
+ var expectedVal = $"value{i * 10}_arg1_22";
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+ CollectionAssert.Contains(resultSet, expectedVal);
+ }
+
+ Assert.AreEqual(Count, resultSet.Count);
+ }
+
[Test]
public async Task TestWithReceiverKeyValueBinaryView()
{
@@ -373,6 +492,34 @@ public class DataStreamerTests : IgniteTestsBase
}
}
+ [Test]
+ public async Task TestWithReceiverResultsKeyValueBinaryView()
+ {
+ IAsyncEnumerable<string> results =
Table.KeyValueBinaryView.StreamDataAsync<int, string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => new KeyValuePair<IIgniteTuple,
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg1", 22 });
+
+ var resultSet = await results.ToHashSetAsync();
+
+ for (int i = 0; i < Count; i++)
+ {
+ var res = await TupleView.GetAsync(null, GetTuple(i));
+
+ var expectedVal = $"value{i * 10}_arg1_22";
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+ CollectionAssert.Contains(resultSet, expectedVal);
+ }
+
+ Assert.AreEqual(Count, resultSet.Count);
+ }
+
[Test]
public async Task TestWithReceiverKeyValueView()
{
@@ -393,6 +540,34 @@ public class DataStreamerTests : IgniteTestsBase
}
}
+ [Test]
+ public async Task TestWithReceiverResultsKeyValueView()
+ {
+ IAsyncEnumerable<string> results = Table.GetKeyValueView<long,
Poco>().StreamDataAsync<int, string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => new KeyValuePair<long, Poco>(x, null!),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg11", 55});
+
+ var resultSet = await results.ToHashSetAsync();
+
+ for (int i = 0; i < Count; i++)
+ {
+ var res = await TupleView.GetAsync(null, GetTuple(i));
+
+ var expectedVal = $"value{i * 10}_arg11_55";
+
+ Assert.IsTrue(res.HasValue);
+ Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+ CollectionAssert.Contains(resultSet, expectedVal);
+ }
+
+ Assert.AreEqual(Count, resultSet.Count);
+ }
+
[Test]
public void TestUnknownReceiverClass()
{
@@ -422,6 +597,21 @@ public class DataStreamerTests : IgniteTestsBase
Assert.AreEqual("Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: Test exception: 1", ex.Message);
}
+ [Test]
+ public void TestReceiverWithResultsException()
+ {
+ var ex = Assert.ThrowsAsync<IgniteException>(async () =>
+ await PocoView.StreamDataAsync<int, string, string>(
+ Enumerable.Range(0, 1).ToAsyncEnumerable(),
+ keySelector: x => GetPoco(x),
+ payloadSelector: _ => string.Empty,
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { "throw", "throw", 1
}).ToListAsync());
+
+ Assert.AreEqual("Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: Test exception: 1", ex.Message);
+ }
+
[Test]
public void TestReceiverSelectorException([Values(true, false)] bool
keySelector)
{
@@ -509,6 +699,73 @@ public class DataStreamerTests : IgniteTestsBase
ex.Message);
}
+ [TestCaseSource(nameof(AllSupportedTypes))]
+ public async Task TestEchoReceiverAllDataTypes(object arg)
+ {
+ var res = await PocoView.StreamDataAsync<object, object, object>(
+ new object[] { 1 }.ToAsyncEnumerable(),
+ keySelector: x => new Poco(),
+ payloadSelector: x => x.ToString()!,
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: EchoArgsReceiverClassName,
+ receiverArgs: new[] { arg }).SingleAsync();
+
+ Assert.AreEqual(arg, res);
+ }
+
+ [Test]
+ public async Task TestResultConsumerEarlyExit()
+ {
+ IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int,
string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => GetPoco(x),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg1", 22 },
+ options: DataStreamerOptions.Default with { PageSize = 1 });
+
+ // Read only part of the results.
+ var resultSet = await results.Take(3).ToListAsync();
+ Assert.AreEqual(3, resultSet.Count);
+
+ for (int i = 0; i < Count; i++)
+ {
+ var res = await TupleView.GetAsync(null, GetTuple(i));
+
+ var expectedVal = $"value{i * 10}_arg1_22";
+
+ Assert.IsTrue(res.HasValue, $"Key {i} not found");
+ Assert.AreEqual(expectedVal, res.Value[ValCol]);
+ }
+ }
+
+ [Test]
+ public async Task TestResultConsumerCancellation()
+ {
+ IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int,
string, string>(
+ Enumerable.Range(0, Count).ToAsyncEnumerable(),
+ keySelector: x => GetPoco(x),
+ payloadSelector: x => $"{x}-value{x * 10}",
+ units: Array.Empty<DeploymentUnit>(),
+ receiverClassName: TestReceiverClassName,
+ receiverArgs: new object[] { Table.Name, "arg1", 22 },
+ options: DataStreamerOptions.Default with { PageSize = 1 });
+
+ var cts = new CancellationTokenSource();
+
+ await using var enumerator = results.GetAsyncEnumerator(cts.Token);
+ Assert.IsTrue(await enumerator.MoveNextAsync());
+
+ // Cancel the resulting enumerator before it's fully consumed. This
stops the streamer.
+ cts.Cancel();
+ Assert.ThrowsAsync<TaskCanceledException>(async () => await
enumerator.MoveNextAsync());
+
+ // Only part of the data was streamed.
+ var streamedData = await TupleView.GetAllAsync(null,
Enumerable.Range(0, Count).Select(x => GetTuple(x)));
+ Assert.Less(streamedData.Count(x => x.HasValue), Count / 2);
+ }
+
private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int
count)
{
for (var i = 0; i < count; i++)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 5b9d3c2aa8..e7ccd6c478 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -919,6 +919,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
AppendNull(); // Value.
break;
+ case bool b:
+ AppendTypeAndScale(ColumnType.Boolean);
+ AppendBool(b);
+ break;
+
case int i32:
AppendTypeAndScale(ColumnType.Int32);
AppendInt(i32);
@@ -995,6 +1000,16 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
AppendTimestamp(instant, timestampPrecision);
break;
+ case Period period:
+ AppendTypeAndScale(ColumnType.Period);
+ AppendPeriod(period);
+ break;
+
+ case Duration duration:
+ AppendTypeAndScale(ColumnType.Duration);
+ AppendDuration(duration);
+ break;
+
case BitArray bitArray:
AppendTypeAndScale(ColumnType.Bitmask);
AppendBitmask(bitArray);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
index bc6993187b..1931ea9115 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
using System;
+ using System.Buffers;
using System.Buffers.Binary;
using System.Collections;
using System.Diagnostics;
@@ -507,6 +508,42 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
return GetObject(index + 2, type, scale);
}
+ /// <summary>
+ /// Gets an object collection with the specified element type.
+ /// Opposite of <see
cref="BinaryTupleBuilder.AppendObjectCollectionWithType{T}"/>.
+ /// </summary>
+ /// <param name="index">Start index.</param>
+ /// <typeparam name="T">Element type.</typeparam>
+ /// <returns>Pooled array with items and actual item count.</returns>
+ public (T[] Items, int Count) GetObjectCollectionWithType<T>(int index
= 0)
+ {
+ int typeId = GetInt(index++);
+ int count = GetInt(index++);
+
+ if (count == 0)
+ {
+ return (Array.Empty<T>(), 0);
+ }
+
+ var items = ArrayPool<T>.Shared.Rent(count);
+ var type = (ColumnType)typeId;
+
+ try
+ {
+ for (int i = 0; i < count; i++)
+ {
+ items[i] = (T)GetObject(index + i, type)!;
+ }
+
+ return (items, count);
+ }
+ catch (Exception)
+ {
+ ArrayPool<T>.Shared.Return(items);
+ throw;
+ }
+ }
+
private static LocalDate ReadDate(ReadOnlySpan<byte> span)
{
// Read int32 from 3 bytes, preserving sign.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index bab2437a96..3dbc64269d 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -23,9 +23,9 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
-using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using Buffers;
using Common;
@@ -60,7 +60,7 @@ internal static class DataStreamerWithReceiver
/// <param name="payloadSelector">Payload func.</param>
/// <param name="keyWriter">Key writer.</param>
/// <param name="options">Options.</param>
- /// <param name="expectResults">Whether to expect results from the
receiver.</param>
+ /// <param name="resultChannel">Channel for results from the receiver.
Null when results are not expected.</param>
/// <param name="units">Deployment units. Can be empty.</param>
/// <param name="receiverClassName">Java class name of the streamer
receiver to execute on the server.</param>
/// <param name="receiverArgs">Receiver args.</param>
@@ -70,30 +70,24 @@ internal static class DataStreamerWithReceiver
/// <typeparam name="TPayload">Payload type.</typeparam>
/// <typeparam name="TResult">Result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- internal static async IAsyncEnumerable<TResult> StreamDataAsync<TSource,
TKey, TPayload, TResult>(
+ internal static async Task StreamDataAsync<TSource, TKey, TPayload,
TResult>(
IAsyncEnumerable<TSource> data,
Table table,
Func<TSource, TKey> keySelector,
Func<TSource, TPayload> payloadSelector,
IRecordSerializerHandler<TKey> keyWriter,
DataStreamerOptions options,
- bool expectResults,
+ Channel<TResult>? resultChannel,
IEnumerable<DeploymentUnit> units,
string receiverClassName,
ICollection<object>? receiverArgs,
- [EnumeratorCancellation] CancellationToken cancellationToken)
+ CancellationToken cancellationToken)
where TKey : notnull
where TPayload : notnull
{
IgniteArgumentCheck.NotNull(data);
DataStreamer.ValidateOptions(options);
- if (expectResults)
- {
- // TODO IGNITE-22356 Support result retrieval.
- throw new NotSupportedException("Result retrieval is not yet
supported.");
- }
-
// ConcurrentDictionary is not necessary because we consume the source
sequentially.
// However, locking for batches is required due to auto-flush
background task.
var batches = new Dictionary<int, Batch<TPayload>>();
@@ -148,8 +142,7 @@ internal static class DataStreamerWithReceiver
}
}
- // TODO IGNITE-22356 Support result retrieval.
- yield break;
+ return;
Batch<TPayload> Add(TSource item)
{
@@ -246,6 +239,7 @@ internal static class DataStreamerWithReceiver
await Task.Yield();
var buf = ProtoCommon.GetMessageWriter();
+ TResult[]? results = null;
try
{
@@ -256,13 +250,33 @@ internal static class DataStreamerWithReceiver
// Wait for the previous batch for this node to preserve item
order.
await oldTask.ConfigureAwait(false);
- await SendBatchAsync(table, buf, count, preferredNode,
retryPolicy).ConfigureAwait(false);
+ (results, int resultsCount) = await SendBatchAsync<TResult>(
+ table, buf, count, preferredNode, retryPolicy,
expectResults: resultChannel != null).ConfigureAwait(false);
+
+ if (results != null && resultChannel != null)
+ {
+ for (var i = 0; i < resultsCount; i++)
+ {
+ TResult result = results[i];
+ await resultChannel.Writer.WriteAsync(result,
cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+ catch (ChannelClosedException)
+ {
+ // Consumer does not want more results, stop returning them,
but keep streaming.
+ resultChannel = null;
}
finally
{
buf.Dispose();
GetPool<TPayload>().Return(items);
+ if (results != null)
+ {
+ GetPool<TResult>().Return(results);
+ }
+
Metrics.StreamerItemsQueuedDecrement(count);
Metrics.StreamerBatchesActiveDecrement();
}
@@ -311,6 +325,7 @@ internal static class DataStreamerWithReceiver
Compute.WriteUnits(units0, buf);
+ var expectResults = resultChannel != null;
w.Write(expectResults);
WriteReceiverPayload(ref w, receiverClassName, receiverArgs ??
Array.Empty<object>(), items);
}
@@ -339,12 +354,13 @@ internal static class DataStreamerWithReceiver
w.Write(builder.Build().Span);
}
- private static async Task SendBatchAsync(
+ private static async Task<(T[]? ResultsPooledArray, int ResultsCount)>
SendBatchAsync<T>(
Table table,
PooledArrayBuffer buf,
int count,
PreferredNode preferredNode,
- IRetryPolicy retryPolicy)
+ IRetryPolicy retryPolicy,
+ bool expectResults)
{
var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
ClientOp.StreamerWithReceiverBatchSend,
@@ -354,10 +370,33 @@ internal static class DataStreamerWithReceiver
retryPolicy)
.ConfigureAwait(false);
- resBuf.Dispose();
+ using (resBuf)
+ {
+ Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+ Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+
+ return expectResults
+ ? Read(resBuf.GetReader())
+ : (null, 0);
+ }
+
+ static (T[]? ResultsPooledArray, int ResultsCount) Read(MsgPackReader
reader)
+ {
+ if (reader.TryReadNil())
+ {
+ return (null, 0);
+ }
+
+ var numElements = reader.ReadInt32();
+ if (numElements == 0)
+ {
+ return (null, 0);
+ }
+
+ var tuple = new BinaryTupleReader(reader.ReadBinary(),
numElements);
- Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
- Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+ return tuple.GetObjectCollectionWithType<T>();
+ }
}
private static ArrayPool<T> GetPool<T>() => ArrayPool<T>.Shared;
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f731a65a43..f96973864e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -19,10 +19,11 @@ namespace Apache.Ignite.Internal.Table
{
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
+ using System.Runtime.CompilerServices;
using System.Threading;
+ using System.Threading.Channels;
using System.Threading.Tasks;
using Buffers;
using Common;
@@ -307,7 +308,7 @@ namespace Apache.Ignite.Internal.Table
cancellationToken).ConfigureAwait(false);
/// <inheritdoc/>
- public IAsyncEnumerable<TResult> StreamDataAsync<TSource, TPayload,
TResult>(
+ public async IAsyncEnumerable<TResult> StreamDataAsync<TSource,
TPayload, TResult>(
IAsyncEnumerable<TSource> data,
Func<TSource, T> keySelector,
Func<TSource, TPayload> payloadSelector,
@@ -315,20 +316,83 @@ namespace Apache.Ignite.Internal.Table
string receiverClassName,
ICollection<object>? receiverArgs,
DataStreamerOptions? options,
- CancellationToken cancellationToken = default)
- where TPayload : notnull =>
- DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload,
TResult>(
- data,
- _table,
- keySelector,
- payloadSelector,
- keyWriter: _ser.Handler,
- options ?? DataStreamerOptions.Default,
- expectResults: true,
- units,
- receiverClassName,
- receiverArgs,
- cancellationToken);
+ [EnumeratorCancellation] CancellationToken cancellationToken =
default)
+ where TPayload : notnull
+ {
+ options ??= DataStreamerOptions.Default;
+
+ // Validate before using for channel capacity.
+ DataStreamer.ValidateOptions(options);
+
+ // Double the page size to read the next page while the previous
one is being consumed.
+ var resultChannelCapacity = options.PageSize * 2;
+
+ Channel<TResult> resultChannel =
Channel.CreateBounded<TResult>(new BoundedChannelOptions(resultChannelCapacity)
+ {
+ // Backpressure - streamer will wait for results to be
consumed before streaming more.
+ FullMode = BoundedChannelFullMode.Wait,
+
+ // One reader: resulting IAsyncEnumerable.
+ SingleReader = true,
+
+ // Many writers: batches may complete in parallel.
+ SingleWriter = false
+ });
+
+ // Stream in background.
+ var streamTask = Stream();
+
+ // Result async enumerable is returned immediately. It will be
completed when the streaming completes.
+ var reader = resultChannel.Reader;
+
+ try
+ {
+ while (await
reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
+ {
+ while (reader.TryRead(out var item))
+ {
+ yield return item;
+ }
+ }
+ }
+ finally
+ {
+ // Consumer has stopped reading, complete the channel.
+ resultChannel.Writer.TryComplete();
+
+ // Wait for the streamer to complete even if the result
consumer has stopped reading.
+ await streamTask.ConfigureAwait(false);
+ }
+
+ [SuppressMessage(
+ "Design",
+ "CA1031:Do not catch general exception types",
+ Justification = "All exceptions should be propagated to the
result channel.")]
+ async Task Stream()
+ {
+ try
+ {
+ await DataStreamerWithReceiver.StreamDataAsync(
+ data,
+ _table,
+ keySelector,
+ payloadSelector,
+ keyWriter: _ser.Handler,
+ options,
+ resultChannel,
+ units,
+ receiverClassName,
+ receiverArgs,
+ cancellationToken).ConfigureAwait(false);
+
+ resultChannel.Writer.Complete();
+ }
+ catch (Exception e)
+ {
+ resultChannel.Writer.TryComplete(e);
+ }
+ }
+ }
/// <inheritdoc/>
public async Task StreamDataAsync<TSource, TPayload>(
@@ -342,24 +406,18 @@ namespace Apache.Ignite.Internal.Table
CancellationToken cancellationToken = default)
where TPayload : notnull
{
- IAsyncEnumerable<object> results =
DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, object>(
+ await DataStreamerWithReceiver.StreamDataAsync<TSource, T,
TPayload, object>(
data,
_table,
keySelector,
payloadSelector,
keyWriter: _ser.Handler,
options ?? DataStreamerOptions.Default,
- expectResults: false,
+ resultChannel: null,
units,
receiverClassName,
receiverArgs,
- cancellationToken);
-
- // Await streaming completion.
- await foreach (var unused in results)
- {
- Debug.Fail("Got results with expectResults=false: " + unused);
- }
+ cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc/>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index 31d3aae978..c5dd1aef5d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -79,7 +79,12 @@ public interface IDataStreamerTarget<T>
/// <param name="receiverArgs">Receiver args.</param>
/// <param name="options">Streamer options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ /// <returns>
+ /// A <see cref="IAsyncEnumerable{T}"/> with the results from the receiver.
+ /// <para />
+ /// The resulting async enumerator applies back-pressure to the data
source, so it should be either fully consumed
+ /// or disposed to complete the streaming. Disposing the enumerator before
it is fully consumed will ignore the remaining results.
+ /// </returns>
/// <typeparam name="TSource">Source item type.</typeparam>
/// <typeparam name="TPayload">Payload type.</typeparam>
/// <typeparam name="TResult">Result type.</typeparam>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index d66ba86956..2539d24935 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -790,18 +790,22 @@ public class PlatformTestNodeRunner {
Table table = ctx.ignite().tables().table(tableName);
RecordView<Tuple> recordView = table.recordView();
+ List<String> res = new ArrayList<>();
for (String s : page) {
String[] parts = s.split("-", 2);
+ String val = parts[1] + "_" + arg1 + "_" + arg2;
Tuple rec = Tuple.create()
.set("key", Long.parseLong(parts[0]))
- .set("val", parts[1] + "_" + arg1 + "_" + arg2);
+ .set("val", val);
+
+ res.add(val);
recordView.upsert(null, rec);
}
- return null;
+ return CompletableFuture.completedFuture(res);
}
}
@@ -834,4 +838,20 @@ public class PlatformTestNodeRunner {
return null;
}
}
+
+ @SuppressWarnings("unused") // Used by platform tests.
+ private static class EchoArgsReceiver implements
DataStreamerReceiver<Object, Object> {
+ @Override
+ public CompletableFuture<List<Object>> receive(List<Object> page,
DataStreamerReceiverContext ctx, Object... args) {
+ return CompletableFuture.completedFuture(List.of(args));
+ }
+ }
+
+ @SuppressWarnings("unused") // Used by platform tests.
+ private static class EchoReceiver implements DataStreamerReceiver<Object,
Object> {
+ @Override
+ public CompletableFuture<List<Object>> receive(List<Object> page,
DataStreamerReceiverContext ctx, Object... args) {
+ return CompletableFuture.completedFuture(page);
+ }
+ }
}