This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dcf85a5943 IGNITE-25344 .NET: Add custom marshallers to data streamer 
(#6242)
8dcf85a5943 is described below

commit 8dcf85a59432aa06761fdb6895fdd1b1a520077e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jul 15 12:20:45 2025 +0300

    IGNITE-25344 .NET: Add custom marshallers to data streamer (#6242)
    
    Add payload, argument and result marshallers to data streamer APIs, 
including .NET receiver.
---
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |   8 +-
 .../Compute/Executor/JobLoadContextTests.cs        |   4 +-
 .../Table/DataStreamerPlatformReceiverTests.cs     |  35 +++++++
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs |  27 +++++
 .../Apache.Ignite.Tests/Table/DotNetReceivers.cs   |  31 ++++++
 .../Table/TestJsonMarshaller.cs                    |  55 ++++++++++
 .../Internal/Table/DataStreamerWithReceiver.cs     |  23 +++--
 .../Apache.Ignite/Internal/Table/RecordView.cs     |   8 +-
 .../Serialization/StreamerReceiverSerializer.cs    | 111 ++++++++++++++++-----
 .../DataStreamerReceiverWrapper.cs                 |   6 +-
 .../Apache.Ignite/Table/IDataStreamerReceiver.cs   |  16 +++
 .../Apache.Ignite/Table/ReceiverDescriptor.cs      |  28 +++++-
 .../runner/app/PlatformTestNodeRunner.java         |  33 ++++++
 13 files changed, 343 insertions(+), 42 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 7ba73bce1d6..055ea5e7f98 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -1097,14 +1097,14 @@ namespace Apache.Ignite.Tests.Compute
             JobTarget.Node(
                 (await Client.GetClusterNodesAsync()).OrderBy(n => 
n.Name).Skip(index).First());
 
-        private record Nested(Guid Id, decimal Price);
+        internal record Nested(Guid Id, decimal Price);
 
         [SuppressMessage("ReSharper", "NotAccessedPositionalProperty.Local", 
Justification = "Tests.")]
-        private record MyArg(int Id, string Name, Nested Nested);
+        internal record MyArg(int Id, string Name, Nested Nested);
 
-        private record MyResult(string Data, Nested Nested);
+        internal record MyResult(string Data, Nested Nested);
 
-        private class ToStringMarshaller : IMarshaller<Nested>
+        internal class ToStringMarshaller : IMarshaller<Nested>
         {
             public void Marshal(Nested obj, IBufferWriter<byte> writer)
             {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
index 44aea22eebf..b594a414ec5 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
@@ -131,7 +131,9 @@ public class JobLoadContextTests
         static PooledBuffer WriteReceiverInfo(string typeName, object arg)
         {
             var items = new object[] { "hello" };
-            using var receiverInfoBuilder = 
StreamerReceiverSerializer.BuildReceiverInfo<object>(typeName, arg, items, 
prefixSize: 4);
+            using var receiverInfoBuilder = 
StreamerReceiverSerializer.BuildReceiverInfo<object, object>(
+                typeName, arg, items, null, null, prefixSize: 4);
+
             Memory<byte> receiverInfoMem = receiverInfoBuilder.Build();
             BinaryPrimitives.WriteInt32LittleEndian(receiverInfoMem.Span, 
receiverInfoBuilder.NumElements);
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
index c66430151c3..9b226454241 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Tests.Table;
 
+using System;
 using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
 using System.Linq;
@@ -233,6 +234,40 @@ public class DataStreamerPlatformReceiverTests : 
IgniteTestsBase
         Assert.AreEqual(1, ex.FailedItems.Count);
     }
 
+    [Test]
+    public async Task TestMarshallerReceiver()
+    {
+        var receiverItem = new 
DotNetReceivers.ReceiverItem<string>(Guid.NewGuid(), "hello");
+        var receiverArg = new DotNetReceivers.ReceiverArg(123, "345");
+
+        DotNetReceivers.ReceiverResult<string> res = await 
PocoView.StreamDataAsync(
+            new object[] { "unused" }.ToAsyncEnumerable(),
+            DotNetReceivers.Marshaller with { DeploymentUnits = 
[_defaultTestUnit] },
+            keySelector: _ => new Poco(),
+            payloadSelector: _ => receiverItem,
+            receiverArg: receiverArg).FirstAsync();
+
+        Assert.AreEqual(receiverArg, res.Arg);
+        Assert.AreEqual(receiverItem, res.Item);
+    }
+
+    [Test]
+    public void TestErrorInMarshaller()
+    {
+        var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await 
PocoView.StreamDataAsync(
+            new object[] { "unused" }.ToAsyncEnumerable(),
+            DotNetReceivers.Marshaller with { DeploymentUnits = 
[_defaultTestUnit] },
+            keySelector: _ => new Poco(),
+            payloadSelector: _ => new 
DotNetReceivers.ReceiverItem<string>(Guid.Empty, "error!"),
+            receiverArg: new DotNetReceivers.ReceiverArg(1, 
"1")).FirstAsync());
+
+        Assert.AreEqual(
+            ".NET job failed: Test marshaller error: ReceiverItem { Id = 
00000000-0000-0000-0000-000000000000, Value = error! }",
+            ex.Message);
+
+        Assert.AreEqual(1, ex.FailedItems.Count);
+    }
+
     private async Task<object> RunEchoArgReceiver(object arg, 
IRecordView<Poco>? view = null)
     {
         view ??= PocoView;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 3561af812f3..266ae08060c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -25,6 +25,7 @@ using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 using Compute;
+using Ignite.Marshalling;
 using Ignite.Table;
 using Internal.Proto;
 using Microsoft.Extensions.Logging;
@@ -46,6 +47,8 @@ public class DataStreamerTests : IgniteTestsBase
 
     private const string UpsertElementTypeNameReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
 
+    private const string MarshallerReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$MarshallerReceiver";
+
     private const int Count = 100;
 
     private const int UpdatedKey = Count / 2;
@@ -63,6 +66,13 @@ public class DataStreamerTests : IgniteTestsBase
 
     private static readonly ReceiverDescriptor<object, object, object> 
EchoArgsReceiver = new(EchoArgsReceiverClassName);
 
+    private static readonly ReceiverDescriptor<ComputeTests.Nested, 
ComputeTests.MyArg, ComputeTests.MyResult> MarshallerReceiver
+        = new(
+            MarshallerReceiverClassName,
+            PayloadMarshaller: new ComputeTests.ToStringMarshaller(),
+            ArgumentMarshaller: new JsonMarshaller<ComputeTests.MyArg>(),
+            ResultMarshaller: new JsonMarshaller<ComputeTests.MyResult>());
+
     private static int _unknownKey = 333000;
 
     [SetUp]
@@ -899,6 +909,23 @@ public class DataStreamerTests : IgniteTestsBase
         Assert.AreEqual(arg, res);
     }
 
+    [Test]
+    public async Task TestMarshallerReceiver()
+    {
+        var payload = new ComputeTests.Nested(Guid.NewGuid(), 1.23m);
+        var arg = new ComputeTests.MyArg(1, "foo", new 
ComputeTests.Nested(Guid.NewGuid(), 2.2m));
+
+        ComputeTests.MyResult res = await PocoView.StreamDataAsync(
+            new[] { payload }.ToAsyncEnumerable(),
+            MarshallerReceiver,
+            keySelector: _ => new Poco(),
+            payloadSelector: x => x,
+            receiverArg: arg).FirstAsync();
+
+        Assert.AreEqual("foo_1", res.Data);
+        Assert.AreEqual(payload, res.Nested);
+    }
+
     [Test]
     public async Task TestResultConsumerEarlyExit()
     {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
index c7cd2ae394a..5dc78e6011b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
@@ -20,8 +20,10 @@ namespace Apache.Ignite.Tests.Table;
 using System;
 using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
+using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+using Ignite.Marshalling;
 using Ignite.Table;
 
 /// <summary>
@@ -41,6 +43,9 @@ public static class DotNetReceivers
 
     public static readonly ReceiverDescriptor<IIgniteTuple, object?, 
IIgniteTuple> UpdateTuple = ReceiverDescriptor.Of(new UpdateTupleReceiver());
 
+    public static readonly ReceiverDescriptor<ReceiverItem<string>, 
ReceiverArg, ReceiverResult<string>> Marshaller =
+        ReceiverDescriptor.Of(new MarshallerReceiver());
+
     public class EchoReceiver : IDataStreamerReceiver<object, object, object>
     {
         public ValueTask<IList<object>?> ReceiveAsync(
@@ -124,4 +129,30 @@ public static class DotNetReceivers
             return ValueTask.FromResult<IList<IIgniteTuple>?>(page);
         }
     }
+
+    public class MarshallerReceiver : 
IDataStreamerReceiver<ReceiverItem<string>, ReceiverArg, ReceiverResult<string>>
+    {
+        public IMarshaller<ReceiverItem<string>> PayloadMarshaller => new 
TestJsonMarshaller<ReceiverItem<string>>(new());
+
+        public IMarshaller<ReceiverArg> ArgumentMarshaller => new 
TestJsonMarshaller<ReceiverArg>(new());
+
+        public IMarshaller<ReceiverResult<string>> ResultMarshaller => new 
TestJsonMarshaller<ReceiverResult<string>>(new());
+
+        public async ValueTask<IList<ReceiverResult<string>>?> ReceiveAsync(
+            IList<ReceiverItem<string>> page,
+            ReceiverArg arg,
+            IDataStreamerReceiverContext context,
+            CancellationToken cancellationToken)
+        {
+            await Task.Yield();
+
+            return page.Select(x => new ReceiverResult<string>(x, 
arg)).ToList();
+        }
+    }
+
+    public record ReceiverItem<T>(Guid Id, T Value);
+
+    public record ReceiverArg(int A, string B);
+
+    public record ReceiverResult<T>(ReceiverItem<T> Item, ReceiverArg Arg);
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs
new file mode 100644
index 00000000000..760456ea084
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Buffers;
+using System.Text;
+using Ignite.Marshalling;
+
+/// <summary>
+/// Test JSON marshaller.
+/// </summary>
+/// <typeparam name="T">Element type.</typeparam>
+public class TestJsonMarshaller<T> : IMarshaller<T>
+{
+    private readonly JsonMarshaller<T> _marshaller;
+
+    public TestJsonMarshaller(JsonMarshaller<T> marshaller) => _marshaller = 
marshaller;
+
+    public void Marshal(T obj, IBufferWriter<byte> writer) => 
_marshaller.Marshal(obj, writer);
+
+    public T Unmarshal(ReadOnlySpan<byte> bytes)
+    {
+        T res;
+
+        try
+        {
+            res = _marshaller.Unmarshal(bytes);
+        }
+        catch (Exception e)
+        {
+            throw new InvalidOperationException($"Failed to deserialize JSON: 
'{Encoding.UTF8.GetString(bytes)}'", e);
+        }
+
+        if (res != null && res.ToString()!.Contains("error", 
StringComparison.OrdinalIgnoreCase))
+        {
+            throw new InvalidOperationException("Test marshaller error: " + 
res);
+        }
+
+        return res;
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index b5a92b01a29..f5e78261b68 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -33,6 +33,7 @@ using Common;
 using Compute;
 using Ignite.Compute;
 using Ignite.Table;
+using Marshalling;
 using Proto;
 using Serialization;
 
@@ -63,6 +64,9 @@ internal static class DataStreamerWithReceiver
     /// <param name="units">Deployment units. Can be empty.</param>
     /// <param name="receiverClassName">Java class name of the streamer 
receiver to execute on the server.</param>
     /// <param name="receiverExecutionOptions">Receiver options.</param>
+    /// <param name="payloadMarshaller">Payload marshaller.</param>
+    /// <param name="argMarshaller">Argument marshaller.</param>
+    /// <param name="resultMarshaller">Result marshaller.</param>
     /// <param name="receiverArg">Receiver arg.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
     /// <typeparam name="TSource">Source type.</typeparam>
@@ -84,6 +88,9 @@ internal static class DataStreamerWithReceiver
         IEnumerable<DeploymentUnit> units,
         string receiverClassName,
         ReceiverExecutionOptions receiverExecutionOptions,
+        IMarshaller<TPayload>? payloadMarshaller,
+        IMarshaller<TArg>? argMarshaller,
+        IMarshaller<TResult>? resultMarshaller,
         TArg receiverArg,
         CancellationToken cancellationToken)
         where TKey : notnull
@@ -293,14 +300,15 @@ internal static class DataStreamerWithReceiver
 
                 // Wait for the previous batch for this node to preserve item 
order.
                 await oldTask.ConfigureAwait(false);
-                (results, int resultsCount) = await SendBatchAsync<TResult>(
+                (results, int resultsCount) = await SendBatchAsync(
                     table,
                     buf,
                     count,
                     preferredNode,
                     retryPolicy,
                     expectResults: resultChannel != null,
-                    customReceiverExecutionOptions).ConfigureAwait(false);
+                    customReceiverExecutionOptions,
+                    resultMarshaller).ConfigureAwait(false);
 
                 if (results != null && resultChannel != null)
                 {
@@ -371,9 +379,9 @@ internal static class DataStreamerWithReceiver
             }
         }
 
-        void SerializeBatch<T>(
+        void SerializeBatch(
             PooledArrayBuffer buf,
-            ArraySegment<T> items,
+            ArraySegment<TPayload> items,
             int partitionId)
         {
             // T is one of the supported types (numbers, strings, etc).
@@ -386,7 +394,7 @@ internal static class DataStreamerWithReceiver
 
             var expectResults = resultChannel != null;
             w.Write(expectResults);
-            StreamerReceiverSerializer.WriteReceiverInfo(ref w, 
receiverClassName, receiverArg, items);
+            StreamerReceiverSerializer.WriteReceiverInfo(ref w, 
receiverClassName, receiverArg, items, payloadMarshaller, argMarshaller);
 
             w.Write(receiverExecutionOptions.Priority);
             w.Write(receiverExecutionOptions.MaxRetries);
@@ -401,7 +409,8 @@ internal static class DataStreamerWithReceiver
         PreferredNode preferredNode,
         IRetryPolicy retryPolicy,
         bool expectResults,
-        bool customReceiverExecutionOptions)
+        bool customReceiverExecutionOptions,
+        IMarshaller<T>? marshaller)
     {
         var (resBuf, socket) = await table.Socket.DoWithRetryAsync(
                 (buf, customReceiverExecutionOptions),
@@ -431,7 +440,7 @@ internal static class DataStreamerWithReceiver
             Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
 
             return expectResults
-                ? 
StreamerReceiverSerializer.ReadReceiverResults<T>(resBuf.GetReader())
+                ? 
StreamerReceiverSerializer.ReadReceiverResults(resBuf.GetReader(), marshaller)
                 : (null, 0);
         }
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 25eeac8f12c..fbc5efa279c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -337,7 +337,7 @@ namespace Apache.Ignite.Internal.Table
                 SingleWriter = false
             });
 
-            // Stream in background.
+            // Stream in the background.
             var streamTask = Stream();
 
             // Result async enumerable is returned immediately. It will be 
completed when the streaming completes.
@@ -381,6 +381,9 @@ namespace Apache.Ignite.Internal.Table
                         receiver.DeploymentUnits ?? [],
                         receiver.ReceiverClassName,
                         receiver.Options ?? ReceiverExecutionOptions.Default,
+                        receiver.PayloadMarshaller,
+                        receiver.ArgumentMarshaller,
+                        receiver.ResultMarshaller,
                         receiverArg,
                         cancellationToken).ConfigureAwait(false);
 
@@ -415,6 +418,9 @@ namespace Apache.Ignite.Internal.Table
                 receiver.DeploymentUnits ?? [],
                 receiver.ReceiverClassName,
                 receiver.Options ?? ReceiverExecutionOptions.Default,
+                null,
+                null,
+                null,
                 receiverArg,
                 cancellationToken).ConfigureAwait(false);
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
index 5cb2f58d817..bd16a1d1e31 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
@@ -27,6 +27,7 @@ using Buffers;
 using Compute;
 using Ignite.Sql;
 using Ignite.Table;
+using Marshalling;
 using Proto.BinaryTuple;
 using Proto.MsgPack;
 
@@ -43,14 +44,19 @@ internal static class StreamerReceiverSerializer
     /// <param name="className">Receiver class name.</param>
     /// <param name="arg">Receiver argument.</param>
     /// <param name="items">Receiver items.</param>
+    /// <param name="payloadMarshaller">Payload marshaller.</param>
+    /// <param name="argMarshaller">Argument marshaller.</param>
     /// <typeparam name="T">Item type.</typeparam>
-    public static void WriteReceiverInfo<T>(
+    /// <typeparam name="TArg">Arg type.</typeparam>
+    public static void WriteReceiverInfo<T, TArg>(
         ref MsgPackWriter w,
         string className,
-        object? arg,
-        ArraySegment<T> items)
+        TArg arg,
+        ArraySegment<T> items,
+        IMarshaller<T>? payloadMarshaller,
+        IMarshaller<TArg>? argMarshaller)
     {
-        using var builder = BuildReceiverInfo(className, arg, items);
+        using var builder = BuildReceiverInfo(className, arg, items, 
payloadMarshaller, argMarshaller);
 
         w.Write(builder.NumElements);
         w.Write(builder.Build().Span);
@@ -62,13 +68,18 @@ internal static class StreamerReceiverSerializer
     /// <param name="className">Receiver class name.</param>
     /// <param name="arg">Receiver argument.</param>
     /// <param name="items">Receiver items.</param>
+    /// <param name="payloadMarshaller">Payload marshaller.</param>
+    /// <param name="argMarshaller">Argument marshaller.</param>
     /// <param name="prefixSize">Builder prefix size.</param>
     /// <typeparam name="T">Item type.</typeparam>
+    /// <typeparam name="TArg">Argument type.</typeparam>
     /// <returns>Binary tuple builder.</returns>
-    public static BinaryTupleBuilder BuildReceiverInfo<T>(
+    public static BinaryTupleBuilder BuildReceiverInfo<T, TArg>(
         string className,
-        object? arg,
+        TArg arg,
         ArraySegment<T> items,
+        IMarshaller<T>? payloadMarshaller,
+        IMarshaller<TArg>? argMarshaller,
         int prefixSize = 0)
     {
         Debug.Assert(items.Count > 0, "items.Count > 0");
@@ -81,7 +92,13 @@ internal static class StreamerReceiverSerializer
         {
             builder.AppendString(className);
 
-            if (arg is IIgniteTuple tupleArg)
+            if (argMarshaller != null)
+            {
+                builder.AppendInt((int)ColumnType.ByteArray);
+                builder.AppendInt(0); // Scale.
+                builder.AppendBytes(static (bufWriter, a) => 
a.argMarshaller.Marshal(a.arg, bufWriter), (arg, argMarshaller));
+            }
+            else if (arg is IIgniteTuple tupleArg)
             {
                 builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
                 builder.AppendInt(0); // Scale.
@@ -92,7 +109,7 @@ internal static class StreamerReceiverSerializer
                 builder.AppendObjectWithType(arg);
             }
 
-            AppendCollection(builder, items);
+            AppendCollection(builder, items, payloadMarshaller);
 
             return builder;
         }
@@ -108,8 +125,9 @@ internal static class StreamerReceiverSerializer
     /// </summary>
     /// <param name="w">Writer.</param>
     /// <param name="res">Results.</param>
+    /// <param name="marshaller">Marshaller.</param>
     /// <typeparam name="T">Result item type.</typeparam>
-    public static void WriteReceiverResults<T>(MsgPackWriter w, IList<T>? res)
+    public static void WriteReceiverResults<T>(MsgPackWriter w, IList<T>? res, 
IMarshaller<T>? marshaller)
     {
         if (res == null)
         {
@@ -121,7 +139,7 @@ internal static class StreamerReceiverSerializer
 
         // Reserve a 4-byte prefix for resTupleElementCount.
         using var builder = new BinaryTupleBuilder(resTupleElementCount, 
prefixSize: 4);
-        AppendCollection(builder, res);
+        AppendCollection(builder, res, marshaller);
 
         Memory<byte> jobResultTupleMemWithPrefix = builder.Build();
         
BinaryPrimitives.WriteInt32LittleEndian(jobResultTupleMemWithPrefix.Span, 
resTupleElementCount);
@@ -132,9 +150,10 @@ internal static class StreamerReceiverSerializer
     /// Reads receiver execution results. Opposite of <see 
cref="WriteReceiverResults{T}"/>.
     /// </summary>
     /// <param name="reader">Reader.</param>
+    /// <param name="marshaller">Marshaller.</param>
     /// <typeparam name="T">Result element type.</typeparam>
     /// <returns>Pooled array with results and the actual element 
count.</returns>
-    public static (T[]? ResultsPooledArray, int ResultsCount) 
ReadReceiverResults<T>(MsgPackReader reader)
+    public static (T[]? ResultsPooledArray, int ResultsCount) 
ReadReceiverResults<T>(MsgPackReader reader, IMarshaller<T>? marshaller)
     {
         if (reader.TryReadNil())
         {
@@ -148,7 +167,7 @@ internal static class StreamerReceiverSerializer
         }
 
         var tuple = new BinaryTupleReader(reader.ReadBinary(), numElements);
-        if (tuple.GetInt(0) != TupleWithSchemaMarshalling.TypeIdTuple)
+        if (tuple.GetInt(0) != TupleWithSchemaMarshalling.TypeIdTuple && 
marshaller == null)
         {
             return tuple.GetObjectCollectionWithType<T>();
         }
@@ -158,6 +177,16 @@ internal static class StreamerReceiverSerializer
 
         try
         {
+            if (marshaller != null)
+            {
+                for (var i = 0; i < elementCount; i++)
+                {
+                    resultsPooledArr[i] = 
marshaller.Unmarshal(tuple.GetBytesSpan(2 + i));
+                }
+
+                return (resultsPooledArr, elementCount);
+            }
+
             for (var i = 0; i < elementCount; i++)
             {
                 resultsPooledArr[i] = 
(T)(object)TupleWithSchemaMarshalling.Unpack(tuple.GetBytesSpan(2 + i));
@@ -184,28 +213,41 @@ internal static class StreamerReceiverSerializer
     /// Reads the receiver info from the buffer.
     /// </summary>
     /// <param name="buf">Buffer.</param>
+    /// <param name="payloadMarshaller">Payload marshaller.</param>
+    /// <param name="argumentMarshaller">Argument marshaller.</param>
     /// <typeparam name="TItem">Item type.</typeparam>
     /// <typeparam name="TArg">Argument type.</typeparam>
     /// <returns>Receiver info.</returns>
-    public static ReceiverInfo<TItem, TArg> ReadReceiverInfo<TItem, 
TArg>(PooledBuffer buf)
+    public static ReceiverInfo<TItem, TArg> ReadReceiverInfo<TItem, TArg>(
+        PooledBuffer buf,
+        IMarshaller<TItem>? payloadMarshaller,
+        IMarshaller<TArg>? argumentMarshaller)
     {
         BinaryTupleReader receiverInfo = GetReceiverInfoReaderFast(buf);
 
-        var arg = (TArg)ReadReceiverArg(ref receiverInfo, 1)!;
-        List<TItem> items = ReadReceiverPage<TItem>(ref receiverInfo);
+        var arg = (TArg)ReadReceiverArg(ref receiverInfo, 1, 
argumentMarshaller)!;
+        List<TItem> items = ReadReceiverPage(ref receiverInfo, 
payloadMarshaller);
 
         return new(items, arg);
     }
 
     [SuppressMessage("Design", "CA1002:Do not expose generic lists", 
Justification = "Private method.")]
-    private static List<T> ReadReceiverPage<T>(ref BinaryTupleReader 
receiverInfo)
+    private static List<T> ReadReceiverPage<T>(ref BinaryTupleReader 
receiverInfo, IMarshaller<T>? marshaller)
     {
         int itemType = receiverInfo.GetInt(4);
         int itemCount = receiverInfo.GetInt(5);
 
         List<T> items = new List<T>(itemCount);
 
-        if (itemType == TupleWithSchemaMarshalling.TypeIdTuple)
+        if (marshaller != null)
+        {
+            for (int i = 0; i < itemCount; i++)
+            {
+                T item = marshaller.Unmarshal(receiverInfo.GetBytesSpan(i + 
6));
+                items.Add(item);
+            }
+        }
+        else if (itemType == TupleWithSchemaMarshalling.TypeIdTuple)
         {
             for (int i = 0; i < itemCount; i++)
             {
@@ -226,19 +268,25 @@ internal static class StreamerReceiverSerializer
         return items;
     }
 
-    private static object? ReadReceiverArg(ref BinaryTupleReader reader, int 
index)
+    private static TArg? ReadReceiverArg<TArg>(ref BinaryTupleReader reader, 
int index, IMarshaller<TArg>? marshaller)
     {
         if (reader.IsNull(index))
         {
-            return null;
+            return default;
         }
 
         if (reader.GetInt(index) == TupleWithSchemaMarshalling.TypeIdTuple)
         {
-            return TupleWithSchemaMarshalling.Unpack(reader.GetBytesSpan(index 
+ 2));
+            return 
(TArg)(object)TupleWithSchemaMarshalling.Unpack(reader.GetBytesSpan(index + 2));
         }
 
-        return reader.GetObject(index);
+        if (marshaller != null)
+        {
+            ReadOnlySpan<byte> bytes = reader.GetBytesSpan(index + 2);
+            return marshaller.Unmarshal(bytes);
+        }
+
+        return (TArg?)reader.GetObject(index);
     }
 
     private static BinaryTupleReader GetReceiverInfoReaderFast(PooledBuffer 
jobArgBuf)
@@ -270,9 +318,26 @@ internal static class StreamerReceiverSerializer
         }
     }
 
-    private static void AppendCollection<T>(BinaryTupleBuilder builder, 
IList<T> items)
+    private static void AppendMarshalledCollection<T>(BinaryTupleBuilder 
builder, ICollection<T> items, IMarshaller<T> marshaller)
     {
-        if (items.Count > 0 && items[0] is IIgniteTuple)
+        builder.AppendInt((int)ColumnType.ByteArray);
+        builder.AppendInt(items.Count);
+
+        foreach (var item in items)
+        {
+            builder.AppendBytes(
+                static (bufWriter, arg) => arg.marsh.Marshal(arg.obj, 
bufWriter),
+                (marsh: marshaller, obj: item));
+        }
+    }
+
+    private static void AppendCollection<T>(BinaryTupleBuilder builder, 
IList<T> items, IMarshaller<T>? marshaller)
+    {
+        if (marshaller != null)
+        {
+            AppendMarshalledCollection(builder, items, marshaller);
+        }
+        else if (items.Count > 0 && items[0] is IIgniteTuple)
         {
             AppendTupleCollection(builder, items);
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
index d48aa8673df..d00b029b462 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
@@ -42,14 +42,16 @@ internal sealed class 
DataStreamerReceiverWrapper<TReceiver, TItem, TArg, TResul
         PooledArrayBuffer responseBuf,
         CancellationToken cancellationToken)
     {
-        var (page, arg) = StreamerReceiverSerializer.ReadReceiverInfo<TItem, 
TArg>(requestBuf);
         TReceiver receiver = new TReceiver();
 
         try
         {
+            var (page, arg) = StreamerReceiverSerializer.ReadReceiverInfo(
+                requestBuf, receiver.PayloadMarshaller, 
receiver.ArgumentMarshaller);
+
             IList<TResult>? res = await receiver.ReceiveAsync(page, arg, 
context, cancellationToken).ConfigureAwait(false);
 
-            
StreamerReceiverSerializer.WriteReceiverResults(responseBuf.MessageWriter, res);
+            
StreamerReceiverSerializer.WriteReceiverResults(responseBuf.MessageWriter, res, 
receiver.ResultMarshaller);
         }
         finally
         {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
index fcd57f384bd..69b513c87e2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
@@ -21,6 +21,7 @@ using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
 using System.Threading;
 using System.Threading.Tasks;
+using Marshalling;
 
 /// <summary>
 /// Data streamer receiver.
@@ -31,6 +32,21 @@ using System.Threading.Tasks;
 [SuppressMessage("ReSharper", "TypeParameterCanBeVariant", Justification = 
"Won't be possible later with marshallers.")]
 public interface IDataStreamerReceiver<TItem, TArg, TResult>
 {
+    /// <summary>
+    /// Gets the custom marshaller for the receiver payload.
+    /// </summary>
+    IMarshaller<TItem>? PayloadMarshaller => null;
+
+    /// <summary>
+    /// Gets the custom marshaller for the receiver input argument.
+    /// </summary>
+    IMarshaller<TArg>? ArgumentMarshaller => null;
+
+    /// <summary>
+    /// Gets the custom marshaller for the receiver result.
+    /// </summary>
+    IMarshaller<TResult>? ResultMarshaller => null;
+
     /// <summary>
     /// Receives a page of items from the data streamer.
     /// <para />
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
index cfbaeb423bb..0b07f93810e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
@@ -21,6 +21,7 @@ using System;
 using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
 using Compute;
+using Marshalling;
 
 /// <summary>
 /// Stream receiver descriptor without results. If the specified receiver 
returns results, they will be discarded on the server.
@@ -75,6 +76,9 @@ public sealed record ReceiverDescriptor<TArg, TResult>(
 /// <param name="ReceiverClassName">Name of the streamer receiver class to 
execute.</param>
 /// <param name="DeploymentUnits">Deployment units.</param>
 /// <param name="Options">Execution options.</param>
+/// <param name="PayloadMarshaller">Payload marshaller.</param>
+/// <param name="ArgumentMarshaller">Argument marshaller.</param>
+/// <param name="ResultMarshaller">Result marshaller.</param>
 /// <typeparam name="TItem">Streamer item type.</typeparam>
 /// <typeparam name="TArg">Argument type.</typeparam>
 /// <typeparam name="TResult">Result type.</typeparam>
@@ -82,7 +86,10 @@ public sealed record ReceiverDescriptor<TArg, TResult>(
 public sealed record ReceiverDescriptor<TItem, TArg, TResult>(
     string ReceiverClassName,
     IEnumerable<DeploymentUnit>? DeploymentUnits = null,
-    ReceiverExecutionOptions? Options = null)
+    ReceiverExecutionOptions? Options = null,
+    IMarshaller<TItem>? PayloadMarshaller = null,
+    IMarshaller<TArg>? ArgumentMarshaller = null,
+    IMarshaller<TResult>? ResultMarshaller = null)
 {
     /// <summary>
     /// Initializes a new instance of the <see 
cref="ReceiverDescriptor{TItem,TArg,TResult}"/> class.
@@ -90,14 +97,23 @@ public sealed record ReceiverDescriptor<TItem, TArg, 
TResult>(
     /// <param name="type">Receiver type.</param>
     /// <param name="deploymentUnits">Deployment units.</param>
     /// <param name="options">Options.</param>
+    /// <param name="payloadMarshaller">Payload marshaller.</param>
+    /// <param name="argumentMarshaller">Argument marshaller.</param>
+    /// <param name="resultMarshaller">Result marshaller.</param>
     public ReceiverDescriptor(
         Type type,
         IEnumerable<DeploymentUnit>? deploymentUnits = null,
-        ReceiverExecutionOptions? options = null)
+        ReceiverExecutionOptions? options = null,
+        IMarshaller<TItem>? payloadMarshaller = null,
+        IMarshaller<TArg>? argumentMarshaller = null,
+        IMarshaller<TResult>? resultMarshaller = null)
         : this(
             type.AssemblyQualifiedName ?? throw new ArgumentException("Type 
has null AssemblyQualifiedName: " + type),
             deploymentUnits,
-            ReceiverDescriptor.EnsureDotNetExecutor(options))
+            ReceiverDescriptor.EnsureDotNetExecutor(options),
+            payloadMarshaller,
+            argumentMarshaller,
+            resultMarshaller)
     {
         // No-op.
     }
@@ -117,7 +133,11 @@ public static class ReceiverDescriptor
     /// <typeparam name="TResult">Result type.</typeparam>
     /// <returns>Receiver descriptor.</returns>
     public static ReceiverDescriptor<TItem, TArg, TResult> Of<TItem, TArg, 
TResult>(IDataStreamerReceiver<TItem, TArg, TResult> receiver) =>
-        new(receiver.GetType());
+        new(
+            receiver.GetType(),
+            payloadMarshaller: receiver.PayloadMarshaller,
+            argumentMarshaller: receiver.ArgumentMarshaller,
+            resultMarshaller: receiver.ResultMarshaller);
 
     /// <summary>
     /// Ensures that the provided <see cref="ReceiverExecutionOptions"/> is 
set to use the .NET executor.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 5b63c15b7fa..ee741f8f9aa 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -858,6 +858,39 @@ public class PlatformTestNodeRunner {
         }
     }
 
+    @SuppressWarnings("unused") // Used by platform tests.
+    private static class MarshallerReceiver implements 
DataStreamerReceiver<Nested, MyArg, MyResult> {
+        @Override
+        public @Nullable Marshaller<Nested, byte[]> payloadMarshaller() {
+            return new ToStringMarshaller();
+        }
+
+        @Override
+        public @Nullable Marshaller<MyArg, byte[]> argumentMarshaller() {
+            return new JsonMarshaller<>(MyArg.class);
+        }
+
+        @Override
+        public @Nullable Marshaller<MyResult, byte[]> resultMarshaller() {
+            return new JsonMarshaller<>(MyResult.class);
+        }
+
+        @Override
+        public CompletableFuture<List<MyResult>> receive(List<Nested> page, 
DataStreamerReceiverContext ctx, MyArg arg) {
+            List<MyResult> results = new ArrayList<>(page.size());
+
+            for (Nested item : page) {
+                MyResult res = new MyResult();
+                res.data = arg.name + "_" + arg.id;
+                res.nested = item;
+
+                results.add(res);
+            }
+
+            return completedFuture(results);
+        }
+    }
+
     @SuppressWarnings("unused") // Used by platform tests.
     private static class PartitionJob implements ComputeJob<Long, Integer> {
         @Override


Reply via email to