This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 351c235c4e IGNITE-22540 .NET: Add ReceiverDescriptor to Data Streamer 
(#3996)
351c235c4e is described below

commit 351c235c4ef68b2057ebacb6fd2147190d701644
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jun 27 13:23:17 2024 +0300

    IGNITE-22540 .NET: Add ReceiverDescriptor to Data Streamer (#3996)
    
    Similar to IGNITE-22441, add `ReceiverDescriptor` to .NET.
    
    * Non-generic class for receiver without results
    * Generic class for receiver with results
    
    This also removes the need for the user to specify generic arguments. 
Before: `view.StreamDataAsync<SourceClass, PayloadClass, ResultClass>(...)`, 
after `view.StreamDataAsync(...)` - type arguments are inferred by the compiler 
from the receiver class.
---
 .../Apache.Ignite.Tests/PartitionAwarenessTests.cs |   5 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 108 +++++++++------------
 .../Apache.Ignite/Internal/Table/KeyValueView.cs   |  15 +--
 .../Apache.Ignite/Internal/Table/RecordView.cs     |  14 ++-
 .../Apache.Ignite/Table/IDataStreamerTarget.cs     |  13 +--
 .../Apache.Ignite/Table/ReceiverDescriptor.cs      |  47 +++++++++
 6 files changed, 109 insertions(+), 93 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index 6e088134ce..d0ad654b42 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -128,8 +128,7 @@ public class PartitionAwarenessTests
                 new[] { 1 }.ToAsyncEnumerable(),
                 keySelector: x => x,
                 payloadSelector: x => x.ToString(),
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: "x"),
+                new("x")),
             ClientOp.StreamerWithReceiverBatchSend);
 
     [Test]
@@ -144,7 +143,7 @@ public class PartitionAwarenessTests
         var options = new DataStreamerOptions { PageSize = 1 };
         var data = producer.Reader.ReadAllAsync();
         var streamerTask = withReceiver
-            ? recordView.StreamDataAsync(data, x => x, x => x.ToString(), 
Array.Empty<DeploymentUnit>(), "x", null, options)
+            ? recordView.StreamDataAsync(data, x => x, x => x.ToString(), 
new("x"), null, options)
             : recordView.StreamDataAsync(data, options);
 
         Func<ITransaction?, Task> action = async _ =>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 7b7d73f958..1103400fc0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -27,7 +27,6 @@ using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 using Compute;
-using Ignite.Compute;
 using Ignite.Table;
 using Internal.Proto;
 using Microsoft.Extensions.Logging;
@@ -53,6 +52,12 @@ public class DataStreamerTests : IgniteTestsBase
 
     private const int DeletedKey = Count + 1;
 
+    private static readonly ReceiverDescriptor<string> TestReceiver = 
new(TestReceiverClassName);
+
+    private static readonly ReceiverDescriptor TestReceiverNoResults = 
new(TestReceiverClassName);
+
+    private static readonly ReceiverDescriptor<object> EchoArgsReceiver = 
new(EchoArgsReceiverClassName);
+
     private static readonly object[] AllSupportedTypes =
     {
         true,
@@ -165,8 +170,7 @@ public class DataStreamerTests : IgniteTestsBase
                 GetTuplesWithDelay(cts.Token),
                 x => GetTuple((long)x[0]!),
                 x => $"{x[0]}-value",
-                Array.Empty<DeploymentUnit>(),
-                TestReceiverClassName,
+                TestReceiverNoResults,
                 receiverArgs: new object[] { Table.Name, "arg1", 22 },
                 options: options);
         }
@@ -231,8 +235,7 @@ public class DataStreamerTests : IgniteTestsBase
                                 
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
                                 t => t,
                                 t => t.ToString()!,
-                                Array.Empty<DeploymentUnit>(),
-                                TestReceiverClassName,
+                                TestReceiverNoResults,
                                 null,
                                 options);
 
@@ -240,12 +243,11 @@ public class DataStreamerTests : IgniteTestsBase
 
                         // Receiver with results.
                         case true:
-                            await 
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string, string>(
+                            await Table.RecordBinaryView.StreamDataAsync(
                                 
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
                                 t => t,
                                 t => t.ToString()!,
-                                Array.Empty<DeploymentUnit>(),
-                                TestReceiverClassName,
+                                TestReceiver,
                                 null,
                                 options).ToListAsync();
 
@@ -286,8 +288,7 @@ public class DataStreamerTests : IgniteTestsBase
                 GetFakeServerData(10_000),
                 keySelector: t => t,
                 payloadSelector: t => t[0]!.ToString()!,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName));
+                TestReceiverNoResults));
 
         StringAssert.StartsWith("Operation StreamerWithReceiverBatchSend 
failed after 16 retries", ex!.Message);
     }
@@ -313,12 +314,11 @@ public class DataStreamerTests : IgniteTestsBase
 
         if (withReceiver)
         {
-            await table!.RecordBinaryView.StreamDataAsync<IIgniteTuple, 
string>(
+            await table!.RecordBinaryView.StreamDataAsync(
                 GetFakeServerData(count),
                 keySelector: t => t,
                 payloadSelector: t => t[0]!.ToString()!,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName);
+                TestReceiverNoResults);
         }
         else
         {
@@ -375,12 +375,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverRecordBinaryView()
     {
-        await TupleView.StreamDataAsync<int, string>(
+        await TupleView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetTuple(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            receiver: TestReceiverNoResults,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default);
 
@@ -396,12 +395,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverWithResultsRecordBinaryView()
     {
-        IAsyncEnumerable<string> results = TupleView.StreamDataAsync<int, 
string, string>(
+        IAsyncEnumerable<string> results = TupleView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetTuple(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default);
 
@@ -425,12 +423,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverRecordView()
     {
-        await PocoView.StreamDataAsync<int, string>(
+        await PocoView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetPoco(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            receiver: TestReceiverNoResults,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default);
 
@@ -446,12 +443,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverResultsRecordView()
     {
-        IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, 
string, string>(
+        IAsyncEnumerable<string> results = PocoView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetPoco(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default);
 
@@ -475,12 +471,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverKeyValueBinaryView()
     {
-        await Table.KeyValueBinaryView.StreamDataAsync<int, string>(
+        await Table.KeyValueBinaryView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => new KeyValuePair<IIgniteTuple, 
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            receiver: TestReceiverNoResults,
             receiverArgs: new object[] { Table.Name, "arg1", 22 });
 
         for (int i = 0; i < Count; i++)
@@ -495,12 +490,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverResultsKeyValueBinaryView()
     {
-        IAsyncEnumerable<string> results = 
Table.KeyValueBinaryView.StreamDataAsync<int, string, string>(
+        IAsyncEnumerable<string> results = 
Table.KeyValueBinaryView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => new KeyValuePair<IIgniteTuple, 
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg1", 22 });
 
         var resultSet = await results.ToHashSetAsync();
@@ -523,12 +517,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverKeyValueView()
     {
-        await Table.GetKeyValueView<long, Poco>().StreamDataAsync<int, string>(
+        await Table.GetKeyValueView<long, Poco>().StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => new KeyValuePair<long, Poco>(x, null!),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            receiver: TestReceiverNoResults,
             receiverArgs: new object[] { Table.Name, "arg11", 55});
 
         for (int i = 0; i < Count; i++)
@@ -543,12 +536,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestWithReceiverResultsKeyValueView()
     {
-        IAsyncEnumerable<string> results = Table.GetKeyValueView<long, 
Poco>().StreamDataAsync<int, string, string>(
+        IAsyncEnumerable<string> results = Table.GetKeyValueView<long, 
Poco>().StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => new KeyValuePair<long, Poco>(x, null!),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg11", 55});
 
         var resultSet = await results.ToHashSetAsync();
@@ -572,12 +564,11 @@ public class DataStreamerTests : IgniteTestsBase
     public void TestUnknownReceiverClass()
     {
         var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-            await TupleView.StreamDataAsync<int, string>(
+            await TupleView.StreamDataAsync(
                 Enumerable.Range(0, 1).ToAsyncEnumerable(),
                 keySelector: x => GetTuple(x),
                 payloadSelector: _ => string.Empty,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: "_unknown_"));
+                new("_unknown_")));
 
         Assert.AreEqual("Streamer receiver failed: Cannot load receiver class 
by name '_unknown_'", ex.Message);
     }
@@ -590,8 +581,7 @@ public class DataStreamerTests : IgniteTestsBase
                 Enumerable.Range(0, 1).ToAsyncEnumerable(),
                 keySelector: x => GetPoco(x),
                 payloadSelector: _ => string.Empty,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName,
+                receiver: TestReceiverNoResults,
                 receiverArgs: new object[] { "throw", "throw", 1 }));
 
         Assert.AreEqual("Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: Test exception: 1", ex.Message);
@@ -601,12 +591,11 @@ public class DataStreamerTests : IgniteTestsBase
     public void TestReceiverWithResultsException()
     {
         var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-            await PocoView.StreamDataAsync<int, string, string>(
+            await PocoView.StreamDataAsync(
                 Enumerable.Range(0, 1).ToAsyncEnumerable(),
                 keySelector: x => GetPoco(x),
                 payloadSelector: _ => string.Empty,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName,
+                TestReceiver,
                 receiverArgs: new object[] { "throw", "throw", 1 
}).ToListAsync());
 
         Assert.AreEqual("Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: Test exception: 1", ex.Message);
@@ -620,8 +609,7 @@ public class DataStreamerTests : IgniteTestsBase
                 Enumerable.Range(0, 1).ToAsyncEnumerable(),
                 keySelector: x => keySelector ? throw new DataException("key") 
: GetPoco(x),
                 payloadSelector: _ => throw new DataException("payload"),
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName,
+                receiver: TestReceiverNoResults,
                 receiverArgs: new object[] { "throw", "throw", 1 }));
 
         Assert.AreEqual(keySelector ? "key" : "payload", ex.Message);
@@ -671,10 +659,9 @@ public class DataStreamerTests : IgniteTestsBase
         var ex = Assert.ThrowsAsync<InvalidOperationException>(async () =>
             await PocoView.StreamDataAsync<object, object>(
                 new object[] { 1, "2" }.ToAsyncEnumerable(),
-                keySelector: x => new Poco(),
+                keySelector: _ => new Poco(),
                 payloadSelector: x => x,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName,
+                receiver: TestReceiverNoResults,
                 receiverArgs: new object[] { TableName, "1", 2 }));
 
         Assert.AreEqual(
@@ -688,10 +675,9 @@ public class DataStreamerTests : IgniteTestsBase
         var ex = Assert.ThrowsAsync<ArgumentNullException>(async () =>
             await PocoView.StreamDataAsync<object, object>(
                 new object[] { "2", null! }.ToAsyncEnumerable(),
-                keySelector: x => new Poco(),
+                keySelector: _ => new Poco(),
                 payloadSelector: x => x,
-                units: Array.Empty<DeploymentUnit>(),
-                receiverClassName: TestReceiverClassName,
+                receiver: TestReceiverNoResults,
                 receiverArgs: new object[] { TableName, "1", 2 }));
 
         Assert.AreEqual(
@@ -704,10 +690,9 @@ public class DataStreamerTests : IgniteTestsBase
     {
         var res = await PocoView.StreamDataAsync<object, object, object>(
             new object[] { 1 }.ToAsyncEnumerable(),
-            keySelector: x => new Poco(),
+            keySelector: _ => new Poco(),
             payloadSelector: x => x.ToString()!,
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: EchoArgsReceiverClassName,
+            EchoArgsReceiver,
             receiverArgs: new[] { arg }).SingleAsync();
 
         Assert.AreEqual(arg, res);
@@ -716,12 +701,11 @@ public class DataStreamerTests : IgniteTestsBase
     [Test]
     public async Task TestResultConsumerEarlyExit()
     {
-        IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, 
string, string>(
+        IAsyncEnumerable<string> results = PocoView.StreamDataAsync(
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetPoco(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default with { PageSize = 1 });
 
@@ -747,8 +731,7 @@ public class DataStreamerTests : IgniteTestsBase
             Enumerable.Range(0, Count).ToAsyncEnumerable(),
             keySelector: x => GetPoco(x),
             payloadSelector: x => $"{x}-value{x * 10}",
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: TestReceiverClassName,
+            TestReceiver,
             receiverArgs: new object[] { Table.Name, "arg1", 22 },
             options: DataStreamerOptions.Default with { PageSize = 1 });
 
@@ -793,8 +776,7 @@ public class DataStreamerTests : IgniteTestsBase
             Enumerable.Range(0, 1).ToAsyncEnumerable(),
             keySelector: x => GetPoco(x),
             payloadSelector: _ => value,
-            units: Array.Empty<DeploymentUnit>(),
-            receiverClassName: UpsertElementTypeNameReceiverClassName,
+            receiver: new 
ReceiverDescriptor(UpsertElementTypeNameReceiverClassName),
             receiverArgs: new object[] { TableName, key1, key2 });
 
         var className = (await TupleView.GetAsync(null, 
GetTuple(key1))).Value[1];
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
index 1ed35d1354..30f5da1c97 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/KeyValueView.cs
@@ -24,7 +24,6 @@ using System.Threading;
 using System.Threading.Tasks;
 using Apache.Ignite.Transactions;
 using Common;
-using Ignite.Compute;
 using Ignite.Sql;
 using Ignite.Table;
 using Linq;
@@ -169,18 +168,16 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
         IAsyncEnumerable<TSource> data,
         Func<TSource, KeyValuePair<TK, TV>> keySelector,
         Func<TSource, TPayload> payloadSelector,
-        IEnumerable<DeploymentUnit> units,
-        string receiverClassName,
+        ReceiverDescriptor<TResult> receiver,
         ICollection<object>? receiverArgs,
         DataStreamerOptions? options,
         CancellationToken cancellationToken = default)
         where TPayload : notnull =>
-        _recordView.StreamDataAsync<TSource, TPayload, TResult>(
+        _recordView.StreamDataAsync(
             data,
             src => ToKv(keySelector(src)),
             payloadSelector,
-            units,
-            receiverClassName,
+            receiver,
             receiverArgs,
             options,
             cancellationToken);
@@ -190,8 +187,7 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
         IAsyncEnumerable<TSource> data,
         Func<TSource, KeyValuePair<TK, TV>> keySelector,
         Func<TSource, TPayload> payloadSelector,
-        IEnumerable<DeploymentUnit> units,
-        string receiverClassName,
+        ReceiverDescriptor receiver,
         ICollection<object>? receiverArgs,
         DataStreamerOptions? options,
         CancellationToken cancellationToken = default)
@@ -200,8 +196,7 @@ internal sealed class KeyValueView<TK, TV> : 
IKeyValueView<TK, TV>
             data,
             src => ToKv(keySelector(src)),
             payloadSelector,
-            units,
-            receiverClassName,
+            receiver,
             receiverArgs,
             options,
             cancellationToken);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f96973864e..518da6ee88 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -312,8 +312,7 @@ namespace Apache.Ignite.Internal.Table
             IAsyncEnumerable<TSource> data,
             Func<TSource, T> keySelector,
             Func<TSource, TPayload> payloadSelector,
-            IEnumerable<DeploymentUnit> units,
-            string receiverClassName,
+            ReceiverDescriptor<TResult> receiver,
             ICollection<object>? receiverArgs,
             DataStreamerOptions? options,
             [EnumeratorCancellation] CancellationToken cancellationToken = 
default)
@@ -380,8 +379,8 @@ namespace Apache.Ignite.Internal.Table
                         keyWriter: _ser.Handler,
                         options,
                         resultChannel,
-                        units,
-                        receiverClassName,
+                        receiver.DeploymentUnits ?? 
Array.Empty<DeploymentUnit>(),
+                        receiver.ReceiverClassName,
                         receiverArgs,
                         cancellationToken).ConfigureAwait(false);
 
@@ -399,8 +398,7 @@ namespace Apache.Ignite.Internal.Table
             IAsyncEnumerable<TSource> data,
             Func<TSource, T> keySelector,
             Func<TSource, TPayload> payloadSelector,
-            IEnumerable<DeploymentUnit> units,
-            string receiverClassName,
+            ReceiverDescriptor receiver,
             ICollection<object>? receiverArgs,
             DataStreamerOptions? options,
             CancellationToken cancellationToken = default)
@@ -414,8 +412,8 @@ namespace Apache.Ignite.Internal.Table
                 keyWriter: _ser.Handler,
                 options ?? DataStreamerOptions.Default,
                 resultChannel: null,
-                units,
-                receiverClassName,
+                receiver.DeploymentUnits ?? Array.Empty<DeploymentUnit>(),
+                receiver.ReceiverClassName,
                 receiverArgs,
                 cancellationToken).ConfigureAwait(false);
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index c5dd1aef5d..0612dfc3f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -22,7 +22,6 @@ using System;
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
-using Compute;
 using Internal.Common;
 
 /// <summary>
@@ -74,8 +73,7 @@ public interface IDataStreamerTarget<T>
     /// <param name="data">Data.</param>
     /// <param name="keySelector">Key selector.</param>
     /// <param name="payloadSelector">Payload selector.</param>
-    /// <param name="units">Deployment units. Can be empty.</param>
-    /// <param name="receiverClassName">Java class name of the streamer 
receiver to execute on the server.</param>
+    /// <param name="receiver">Streamer receiver descriptor.</param>
     /// <param name="receiverArgs">Receiver args.</param>
     /// <param name="options">Streamer options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
@@ -92,8 +90,7 @@ public interface IDataStreamerTarget<T>
         IAsyncEnumerable<TSource> data,
         Func<TSource, T> keySelector,
         Func<TSource, TPayload> payloadSelector,
-        IEnumerable<DeploymentUnit> units,
-        string receiverClassName,
+        ReceiverDescriptor<TResult> receiver,
         ICollection<object>? receiverArgs = null,
         DataStreamerOptions? options = null,
         CancellationToken cancellationToken = default)
@@ -105,8 +102,7 @@ public interface IDataStreamerTarget<T>
     /// <param name="data">Data.</param>
     /// <param name="keySelector">Key selector.</param>
     /// <param name="payloadSelector">Payload selector.</param>
-    /// <param name="units">Deployment units. Can be empty.</param>
-    /// <param name="receiverClassName">Java class name of the streamer 
receiver to execute on the server.</param>
+    /// <param name="receiver">Receiver descriptor.</param>
     /// <param name="receiverArgs">Receiver args.</param>
     /// <param name="options">Streamer options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
@@ -117,8 +113,7 @@ public interface IDataStreamerTarget<T>
         IAsyncEnumerable<TSource> data,
         Func<TSource, T> keySelector,
         Func<TSource, TPayload> payloadSelector,
-        IEnumerable<DeploymentUnit> units,
-        string receiverClassName,
+        ReceiverDescriptor receiver,
         ICollection<object>? receiverArgs = null,
         DataStreamerOptions? options = null,
         CancellationToken cancellationToken = default)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
new file mode 100644
index 0000000000..2b9afb8c68
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+using System.Collections.Generic;
+using Compute;
+
+/// <summary>
+/// Stream receiver descriptor without results. If the specified receiver 
returns results, they will be discarded on the server.
+/// </summary>
+/// <param name="ReceiverClassName">Java class name of the streamer receiver 
to execute.</param>
+/// <param name="DeploymentUnits">Deployment units.</param>
+public sealed record ReceiverDescriptor(
+    string ReceiverClassName,
+    IEnumerable<DeploymentUnit>? DeploymentUnits = null);
+
+/// <summary>
+/// Stream receiver descriptor with result type.
+/// </summary>
+/// <param name="ReceiverClassName">Java class name of the streamer receiver 
to execute.</param>
+/// <param name="DeploymentUnits">Deployment units.</param>
+/// <typeparam name="TResult">Result type.</typeparam>
+public sealed record ReceiverDescriptor<TResult>(
+    string ReceiverClassName,
+    IEnumerable<DeploymentUnit>? DeploymentUnits = null)
+{
+    /// <summary>
+    /// Gets the result type.
+    /// </summary>
+    public Type ResultType => typeof(TResult);
+}

Reply via email to