This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 351c235c4e IGNITE-22540 .NET: Add ReceiverDescriptor to Data Streamer
(#3996)
351c235c4e is described below
commit 351c235c4ef68b2057ebacb6fd2147190d701644
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jun 27 13:23:17 2024 +0300
IGNITE-22540 .NET: Add ReceiverDescriptor to Data Streamer (#3996)
Similar to IGNITE-22441, add `ReceiverDescriptor` to .NET.
* Non-generic class for receiver without results
* Generic class for receiver with results
This also removes the need for the user to specify generic arguments.
Before: `view.StreamDataAsync<SourceClass, PayloadClass, ResultClass>(...)`,
after `view.StreamDataAsync(...)` - type arguments are inferred by the compiler
from the receiver class.
---
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 5 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 108 +++++++++------------
.../Apache.Ignite/Internal/Table/KeyValueView.cs | 15 +--
.../Apache.Ignite/Internal/Table/RecordView.cs | 14 ++-
.../Apache.Ignite/Table/IDataStreamerTarget.cs | 13 +--
.../Apache.Ignite/Table/ReceiverDescriptor.cs | 47 +++++++++
6 files changed, 109 insertions(+), 93 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index 6e088134ce..d0ad654b42 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -128,8 +128,7 @@ public class PartitionAwarenessTests
new[] { 1 }.ToAsyncEnumerable(),
keySelector: x => x,
payloadSelector: x => x.ToString(),
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: "x"),
+ new("x")),
ClientOp.StreamerWithReceiverBatchSend);
[Test]
@@ -144,7 +143,7 @@ public class PartitionAwarenessTests
var options = new DataStreamerOptions { PageSize = 1 };
var data = producer.Reader.ReadAllAsync();
var streamerTask = withReceiver
- ? recordView.StreamDataAsync(data, x => x, x => x.ToString(),
Array.Empty<DeploymentUnit>(), "x", null, options)
+ ? recordView.StreamDataAsync(data, x => x, x => x.ToString(),
new("x"), null, options)
: recordView.StreamDataAsync(data, options);
Func<ITransaction?, Task> action = async _ =>
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 7b7d73f958..1103400fc0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -27,7 +27,6 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Compute;
-using Ignite.Compute;
using Ignite.Table;
using Internal.Proto;
using Microsoft.Extensions.Logging;
@@ -53,6 +52,12 @@ public class DataStreamerTests : IgniteTestsBase
private const int DeletedKey = Count + 1;
+ private static readonly ReceiverDescriptor<string> TestReceiver =
new(TestReceiverClassName);
+
+ private static readonly ReceiverDescriptor TestReceiverNoResults =
new(TestReceiverClassName);
+
+ private static readonly ReceiverDescriptor<object> EchoArgsReceiver =
new(EchoArgsReceiverClassName);
+
private static readonly object[] AllSupportedTypes =
{
true,
@@ -165,8 +170,7 @@ public class DataStreamerTests : IgniteTestsBase
GetTuplesWithDelay(cts.Token),
x => GetTuple((long)x[0]!),
x => $"{x[0]}-value",
- Array.Empty<DeploymentUnit>(),
- TestReceiverClassName,
+ TestReceiverNoResults,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: options);
}
@@ -231,8 +235,7 @@ public class DataStreamerTests : IgniteTestsBase
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
t => t,
t => t.ToString()!,
- Array.Empty<DeploymentUnit>(),
- TestReceiverClassName,
+ TestReceiverNoResults,
null,
options);
@@ -240,12 +243,11 @@ public class DataStreamerTests : IgniteTestsBase
// Receiver with results.
case true:
- await
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string, string>(
+ await Table.RecordBinaryView.StreamDataAsync(
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
t => t,
t => t.ToString()!,
- Array.Empty<DeploymentUnit>(),
- TestReceiverClassName,
+ TestReceiver,
null,
options).ToListAsync();
@@ -286,8 +288,7 @@ public class DataStreamerTests : IgniteTestsBase
GetFakeServerData(10_000),
keySelector: t => t,
payloadSelector: t => t[0]!.ToString()!,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName));
+ TestReceiverNoResults));
StringAssert.StartsWith("Operation StreamerWithReceiverBatchSend
failed after 16 retries", ex!.Message);
}
@@ -313,12 +314,11 @@ public class DataStreamerTests : IgniteTestsBase
if (withReceiver)
{
- await table!.RecordBinaryView.StreamDataAsync<IIgniteTuple,
string>(
+ await table!.RecordBinaryView.StreamDataAsync(
GetFakeServerData(count),
keySelector: t => t,
payloadSelector: t => t[0]!.ToString()!,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName);
+ TestReceiverNoResults);
}
else
{
@@ -375,12 +375,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverRecordBinaryView()
{
- await TupleView.StreamDataAsync<int, string>(
+ await TupleView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetTuple(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default);
@@ -396,12 +395,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverWithResultsRecordBinaryView()
{
- IAsyncEnumerable<string> results = TupleView.StreamDataAsync<int,
string, string>(
+ IAsyncEnumerable<string> results = TupleView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetTuple(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default);
@@ -425,12 +423,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverRecordView()
{
- await PocoView.StreamDataAsync<int, string>(
+ await PocoView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default);
@@ -446,12 +443,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverResultsRecordView()
{
- IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int,
string, string>(
+ IAsyncEnumerable<string> results = PocoView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default);
@@ -475,12 +471,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverKeyValueBinaryView()
{
- await Table.KeyValueBinaryView.StreamDataAsync<int, string>(
+ await Table.KeyValueBinaryView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => new KeyValuePair<IIgniteTuple,
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { Table.Name, "arg1", 22 });
for (int i = 0; i < Count; i++)
@@ -495,12 +490,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverResultsKeyValueBinaryView()
{
- IAsyncEnumerable<string> results =
Table.KeyValueBinaryView.StreamDataAsync<int, string, string>(
+ IAsyncEnumerable<string> results =
Table.KeyValueBinaryView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => new KeyValuePair<IIgniteTuple,
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg1", 22 });
var resultSet = await results.ToHashSetAsync();
@@ -523,12 +517,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverKeyValueView()
{
- await Table.GetKeyValueView<long, Poco>().StreamDataAsync<int, string>(
+ await Table.GetKeyValueView<long, Poco>().StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => new KeyValuePair<long, Poco>(x, null!),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { Table.Name, "arg11", 55});
for (int i = 0; i < Count; i++)
@@ -543,12 +536,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestWithReceiverResultsKeyValueView()
{
- IAsyncEnumerable<string> results = Table.GetKeyValueView<long,
Poco>().StreamDataAsync<int, string, string>(
+ IAsyncEnumerable<string> results = Table.GetKeyValueView<long,
Poco>().StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => new KeyValuePair<long, Poco>(x, null!),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg11", 55});
var resultSet = await results.ToHashSetAsync();
@@ -572,12 +564,11 @@ public class DataStreamerTests : IgniteTestsBase
public void TestUnknownReceiverClass()
{
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await TupleView.StreamDataAsync<int, string>(
+ await TupleView.StreamDataAsync(
Enumerable.Range(0, 1).ToAsyncEnumerable(),
keySelector: x => GetTuple(x),
payloadSelector: _ => string.Empty,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: "_unknown_"));
+ new("_unknown_")));
Assert.AreEqual("Streamer receiver failed: Cannot load receiver class
by name '_unknown_'", ex.Message);
}
@@ -590,8 +581,7 @@ public class DataStreamerTests : IgniteTestsBase
Enumerable.Range(0, 1).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: _ => string.Empty,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { "throw", "throw", 1 }));
Assert.AreEqual("Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: Test exception: 1", ex.Message);
@@ -601,12 +591,11 @@ public class DataStreamerTests : IgniteTestsBase
public void TestReceiverWithResultsException()
{
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await PocoView.StreamDataAsync<int, string, string>(
+ await PocoView.StreamDataAsync(
Enumerable.Range(0, 1).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: _ => string.Empty,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { "throw", "throw", 1
}).ToListAsync());
Assert.AreEqual("Streamer receiver failed: Job execution failed:
java.lang.ArithmeticException: Test exception: 1", ex.Message);
@@ -620,8 +609,7 @@ public class DataStreamerTests : IgniteTestsBase
Enumerable.Range(0, 1).ToAsyncEnumerable(),
keySelector: x => keySelector ? throw new DataException("key")
: GetPoco(x),
payloadSelector: _ => throw new DataException("payload"),
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { "throw", "throw", 1 }));
Assert.AreEqual(keySelector ? "key" : "payload", ex.Message);
@@ -671,10 +659,9 @@ public class DataStreamerTests : IgniteTestsBase
var ex = Assert.ThrowsAsync<InvalidOperationException>(async () =>
await PocoView.StreamDataAsync<object, object>(
new object[] { 1, "2" }.ToAsyncEnumerable(),
- keySelector: x => new Poco(),
+ keySelector: _ => new Poco(),
payloadSelector: x => x,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { TableName, "1", 2 }));
Assert.AreEqual(
@@ -688,10 +675,9 @@ public class DataStreamerTests : IgniteTestsBase
var ex = Assert.ThrowsAsync<ArgumentNullException>(async () =>
await PocoView.StreamDataAsync<object, object>(
new object[] { "2", null! }.ToAsyncEnumerable(),
- keySelector: x => new Poco(),
+ keySelector: _ => new Poco(),
payloadSelector: x => x,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ receiver: TestReceiverNoResults,
receiverArgs: new object[] { TableName, "1", 2 }));
Assert.AreEqual(
@@ -704,10 +690,9 @@ public class DataStreamerTests : IgniteTestsBase
{
var res = await PocoView.StreamDataAsync<object, object, object>(
new object[] { 1 }.ToAsyncEnumerable(),
- keySelector: x => new Poco(),
+ keySelector: _ => new Poco(),
payloadSelector: x => x.ToString()!,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: EchoArgsReceiverClassName,
+ EchoArgsReceiver,
receiverArgs: new[] { arg }).SingleAsync();
Assert.AreEqual(arg, res);
@@ -716,12 +701,11 @@ public class DataStreamerTests : IgniteTestsBase
[Test]
public async Task TestResultConsumerEarlyExit()
{
- IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int,
string, string>(
+ IAsyncEnumerable<string> results = PocoView.StreamDataAsync(
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default with { PageSize = 1 });
@@ -747,8 +731,7 @@ public class DataStreamerTests : IgniteTestsBase
Enumerable.Range(0, Count).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: x => $"{x}-value{x * 10}",
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: TestReceiverClassName,
+ TestReceiver,
receiverArgs: new object[] { Table.Name, "arg1", 22 },
options: DataStreamerOptions.Default with { PageSize = 1 });
@@ -793,8 +776,7 @@ public class DataStreamerTests : IgniteTestsBase
Enumerable.Range(0, 1).ToAsyncEnumerable(),
keySelector: x => GetPoco(x),
payloadSelector: _ => value,
- units: Array.Empty<DeploymentUnit>(),
- receiverClassName: UpsertElementTypeNameReceiverClassName,
+ receiver: new
ReceiverDescriptor(UpsertElementTypeNameReceiverClassName),
receiverArgs: new object[] { TableName, key1, key2 });
var className = (await TupleView.GetAsync(null,
GetTuple(key1))).Value[1];
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 1ed35d1354..30f5da1c97 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -24,7 +24,6 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Transactions;
using Common;
-using Ignite.Compute;
using Ignite.Sql;
using Ignite.Table;
using Linq;
@@ -169,18 +168,16 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
IAsyncEnumerable<TSource> data,
Func<TSource, KeyValuePair<TK, TV>> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor<TResult> receiver,
ICollection<object>? receiverArgs,
DataStreamerOptions? options,
CancellationToken cancellationToken = default)
where TPayload : notnull =>
- _recordView.StreamDataAsync<TSource, TPayload, TResult>(
+ _recordView.StreamDataAsync(
data,
src => ToKv(keySelector(src)),
payloadSelector,
- units,
- receiverClassName,
+ receiver,
receiverArgs,
options,
cancellationToken);
@@ -190,8 +187,7 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
IAsyncEnumerable<TSource> data,
Func<TSource, KeyValuePair<TK, TV>> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor receiver,
ICollection<object>? receiverArgs,
DataStreamerOptions? options,
CancellationToken cancellationToken = default)
@@ -200,8 +196,7 @@ internal sealed class KeyValueView<TK, TV> :
IKeyValueView<TK, TV>
data,
src => ToKv(keySelector(src)),
payloadSelector,
- units,
- receiverClassName,
+ receiver,
receiverArgs,
options,
cancellationToken);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f96973864e..518da6ee88 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -312,8 +312,7 @@ namespace Apache.Ignite.Internal.Table
IAsyncEnumerable<TSource> data,
Func<TSource, T> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor<TResult> receiver,
ICollection<object>? receiverArgs,
DataStreamerOptions? options,
[EnumeratorCancellation] CancellationToken cancellationToken =
default)
@@ -380,8 +379,8 @@ namespace Apache.Ignite.Internal.Table
keyWriter: _ser.Handler,
options,
resultChannel,
- units,
- receiverClassName,
+ receiver.DeploymentUnits ??
Array.Empty<DeploymentUnit>(),
+ receiver.ReceiverClassName,
receiverArgs,
cancellationToken).ConfigureAwait(false);
@@ -399,8 +398,7 @@ namespace Apache.Ignite.Internal.Table
IAsyncEnumerable<TSource> data,
Func<TSource, T> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor receiver,
ICollection<object>? receiverArgs,
DataStreamerOptions? options,
CancellationToken cancellationToken = default)
@@ -414,8 +412,8 @@ namespace Apache.Ignite.Internal.Table
keyWriter: _ser.Handler,
options ?? DataStreamerOptions.Default,
resultChannel: null,
- units,
- receiverClassName,
+ receiver.DeploymentUnits ?? Array.Empty<DeploymentUnit>(),
+ receiver.ReceiverClassName,
receiverArgs,
cancellationToken).ConfigureAwait(false);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index c5dd1aef5d..0612dfc3f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -22,7 +22,6 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using Compute;
using Internal.Common;
/// <summary>
@@ -74,8 +73,7 @@ public interface IDataStreamerTarget<T>
/// <param name="data">Data.</param>
/// <param name="keySelector">Key selector.</param>
/// <param name="payloadSelector">Payload selector.</param>
- /// <param name="units">Deployment units. Can be empty.</param>
- /// <param name="receiverClassName">Java class name of the streamer
receiver to execute on the server.</param>
+ /// <param name="receiver">Streamer receiver descriptor.</param>
/// <param name="receiverArgs">Receiver args.</param>
/// <param name="options">Streamer options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
@@ -92,8 +90,7 @@ public interface IDataStreamerTarget<T>
IAsyncEnumerable<TSource> data,
Func<TSource, T> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor<TResult> receiver,
ICollection<object>? receiverArgs = null,
DataStreamerOptions? options = null,
CancellationToken cancellationToken = default)
@@ -105,8 +102,7 @@ public interface IDataStreamerTarget<T>
/// <param name="data">Data.</param>
/// <param name="keySelector">Key selector.</param>
/// <param name="payloadSelector">Payload selector.</param>
- /// <param name="units">Deployment units. Can be empty.</param>
- /// <param name="receiverClassName">Java class name of the streamer
receiver to execute on the server.</param>
+ /// <param name="receiver">Receiver descriptor.</param>
/// <param name="receiverArgs">Receiver args.</param>
/// <param name="options">Streamer options.</param>
/// <param name="cancellationToken">Cancellation token.</param>
@@ -117,8 +113,7 @@ public interface IDataStreamerTarget<T>
IAsyncEnumerable<TSource> data,
Func<TSource, T> keySelector,
Func<TSource, TPayload> payloadSelector,
- IEnumerable<DeploymentUnit> units,
- string receiverClassName,
+ ReceiverDescriptor receiver,
ICollection<object>? receiverArgs = null,
DataStreamerOptions? options = null,
CancellationToken cancellationToken = default)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
new file mode 100644
index 0000000000..2b9afb8c68
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+using System.Collections.Generic;
+using Compute;
+
+/// <summary>
+/// Stream receiver descriptor without results. If the specified receiver
returns results, they will be discarded on the server.
+/// </summary>
+/// <param name="ReceiverClassName">Java class name of the streamer receiver
to execute.</param>
+/// <param name="DeploymentUnits">Deployment units.</param>
+public sealed record ReceiverDescriptor(
+ string ReceiverClassName,
+ IEnumerable<DeploymentUnit>? DeploymentUnits = null);
+
+/// <summary>
+/// Stream receiver descriptor with result type.
+/// </summary>
+/// <param name="ReceiverClassName">Java class name of the streamer receiver
to execute.</param>
+/// <param name="DeploymentUnits">Deployment units.</param>
+/// <typeparam name="TResult">Result type.</typeparam>
+public sealed record ReceiverDescriptor<TResult>(
+ string ReceiverClassName,
+ IEnumerable<DeploymentUnit>? DeploymentUnits = null)
+{
+ /// <summary>
+ /// Gets the result type.
+ /// </summary>
+ public Type ResultType => typeof(TResult);
+}