This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8dcf85a5943 IGNITE-25344 .NET: Add custom marshallers to data streamer
(#6242)
8dcf85a5943 is described below
commit 8dcf85a59432aa06761fdb6895fdd1b1a520077e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jul 15 12:20:45 2025 +0300
IGNITE-25344 .NET: Add custom marshallers to data streamer (#6242)
Add payload, argument and result marshallers to data streamer APIs,
including .NET receiver.
---
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 8 +-
.../Compute/Executor/JobLoadContextTests.cs | 4 +-
.../Table/DataStreamerPlatformReceiverTests.cs | 35 +++++++
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 27 +++++
.../Apache.Ignite.Tests/Table/DotNetReceivers.cs | 31 ++++++
.../Table/TestJsonMarshaller.cs | 55 ++++++++++
.../Internal/Table/DataStreamerWithReceiver.cs | 23 +++--
.../Apache.Ignite/Internal/Table/RecordView.cs | 8 +-
.../Serialization/StreamerReceiverSerializer.cs | 111 ++++++++++++++++-----
.../DataStreamerReceiverWrapper.cs | 6 +-
.../Apache.Ignite/Table/IDataStreamerReceiver.cs | 16 +++
.../Apache.Ignite/Table/ReceiverDescriptor.cs | 28 +++++-
.../runner/app/PlatformTestNodeRunner.java | 33 ++++++
13 files changed, 343 insertions(+), 42 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 7ba73bce1d6..055ea5e7f98 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -1097,14 +1097,14 @@ namespace Apache.Ignite.Tests.Compute
JobTarget.Node(
(await Client.GetClusterNodesAsync()).OrderBy(n =>
n.Name).Skip(index).First());
- private record Nested(Guid Id, decimal Price);
+ internal record Nested(Guid Id, decimal Price);
[SuppressMessage("ReSharper", "NotAccessedPositionalProperty.Local",
Justification = "Tests.")]
- private record MyArg(int Id, string Name, Nested Nested);
+ internal record MyArg(int Id, string Name, Nested Nested);
- private record MyResult(string Data, Nested Nested);
+ internal record MyResult(string Data, Nested Nested);
- private class ToStringMarshaller : IMarshaller<Nested>
+ internal class ToStringMarshaller : IMarshaller<Nested>
{
public void Marshal(Nested obj, IBufferWriter<byte> writer)
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
index 44aea22eebf..b594a414ec5 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/Executor/JobLoadContextTests.cs
@@ -131,7 +131,9 @@ public class JobLoadContextTests
static PooledBuffer WriteReceiverInfo(string typeName, object arg)
{
var items = new object[] { "hello" };
- using var receiverInfoBuilder =
StreamerReceiverSerializer.BuildReceiverInfo<object>(typeName, arg, items,
prefixSize: 4);
+ using var receiverInfoBuilder =
StreamerReceiverSerializer.BuildReceiverInfo<object, object>(
+ typeName, arg, items, null, null, prefixSize: 4);
+
Memory<byte> receiverInfoMem = receiverInfoBuilder.Build();
BinaryPrimitives.WriteInt32LittleEndian(receiverInfoMem.Span,
receiverInfoBuilder.NumElements);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
index c66430151c3..9b226454241 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerPlatformReceiverTests.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Tests.Table;
+using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
@@ -233,6 +234,40 @@ public class DataStreamerPlatformReceiverTests :
IgniteTestsBase
Assert.AreEqual(1, ex.FailedItems.Count);
}
+ [Test]
+ public async Task TestMarshallerReceiver()
+ {
+ var receiverItem = new
DotNetReceivers.ReceiverItem<string>(Guid.NewGuid(), "hello");
+ var receiverArg = new DotNetReceivers.ReceiverArg(123, "345");
+
+ DotNetReceivers.ReceiverResult<string> res = await
PocoView.StreamDataAsync(
+ new object[] { "unused" }.ToAsyncEnumerable(),
+ DotNetReceivers.Marshaller with { DeploymentUnits =
[_defaultTestUnit] },
+ keySelector: _ => new Poco(),
+ payloadSelector: _ => receiverItem,
+ receiverArg: receiverArg).FirstAsync();
+
+ Assert.AreEqual(receiverArg, res.Arg);
+ Assert.AreEqual(receiverItem, res.Item);
+ }
+
+ [Test]
+ public void TestErrorInMarshaller()
+ {
+ var ex = Assert.ThrowsAsync<DataStreamerException>(async () => await
PocoView.StreamDataAsync(
+ new object[] { "unused" }.ToAsyncEnumerable(),
+ DotNetReceivers.Marshaller with { DeploymentUnits =
[_defaultTestUnit] },
+ keySelector: _ => new Poco(),
+ payloadSelector: _ => new
DotNetReceivers.ReceiverItem<string>(Guid.Empty, "error!"),
+ receiverArg: new DotNetReceivers.ReceiverArg(1,
"1")).FirstAsync());
+
+ Assert.AreEqual(
+ ".NET job failed: Test marshaller error: ReceiverItem { Id =
00000000-0000-0000-0000-000000000000, Value = error! }",
+ ex.Message);
+
+ Assert.AreEqual(1, ex.FailedItems.Count);
+ }
+
private async Task<object> RunEchoArgReceiver(object arg,
IRecordView<Poco>? view = null)
{
view ??= PocoView;
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 3561af812f3..266ae08060c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -25,6 +25,7 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Compute;
+using Ignite.Marshalling;
using Ignite.Table;
using Internal.Proto;
using Microsoft.Extensions.Logging;
@@ -46,6 +47,8 @@ public class DataStreamerTests : IgniteTestsBase
private const string UpsertElementTypeNameReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
+ private const string MarshallerReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$MarshallerReceiver";
+
private const int Count = 100;
private const int UpdatedKey = Count / 2;
@@ -63,6 +66,13 @@ public class DataStreamerTests : IgniteTestsBase
private static readonly ReceiverDescriptor<object, object, object>
EchoArgsReceiver = new(EchoArgsReceiverClassName);
+ private static readonly ReceiverDescriptor<ComputeTests.Nested,
ComputeTests.MyArg, ComputeTests.MyResult> MarshallerReceiver
+ = new(
+ MarshallerReceiverClassName,
+ PayloadMarshaller: new ComputeTests.ToStringMarshaller(),
+ ArgumentMarshaller: new JsonMarshaller<ComputeTests.MyArg>(),
+ ResultMarshaller: new JsonMarshaller<ComputeTests.MyResult>());
+
private static int _unknownKey = 333000;
[SetUp]
@@ -899,6 +909,23 @@ public class DataStreamerTests : IgniteTestsBase
Assert.AreEqual(arg, res);
}
+ [Test]
+ public async Task TestMarshallerReceiver()
+ {
+ var payload = new ComputeTests.Nested(Guid.NewGuid(), 1.23m);
+ var arg = new ComputeTests.MyArg(1, "foo", new
ComputeTests.Nested(Guid.NewGuid(), 2.2m));
+
+ ComputeTests.MyResult res = await PocoView.StreamDataAsync(
+ new[] { payload }.ToAsyncEnumerable(),
+ MarshallerReceiver,
+ keySelector: _ => new Poco(),
+ payloadSelector: x => x,
+ receiverArg: arg).FirstAsync();
+
+ Assert.AreEqual("foo_1", res.Data);
+ Assert.AreEqual(payload, res.Nested);
+ }
+
[Test]
public async Task TestResultConsumerEarlyExit()
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
index c7cd2ae394a..5dc78e6011b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DotNetReceivers.cs
@@ -20,8 +20,10 @@ namespace Apache.Ignite.Tests.Table;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Ignite.Marshalling;
using Ignite.Table;
/// <summary>
@@ -41,6 +43,9 @@ public static class DotNetReceivers
public static readonly ReceiverDescriptor<IIgniteTuple, object?,
IIgniteTuple> UpdateTuple = ReceiverDescriptor.Of(new UpdateTupleReceiver());
+ public static readonly ReceiverDescriptor<ReceiverItem<string>,
ReceiverArg, ReceiverResult<string>> Marshaller =
+ ReceiverDescriptor.Of(new MarshallerReceiver());
+
public class EchoReceiver : IDataStreamerReceiver<object, object, object>
{
public ValueTask<IList<object>?> ReceiveAsync(
@@ -124,4 +129,30 @@ public static class DotNetReceivers
return ValueTask.FromResult<IList<IIgniteTuple>?>(page);
}
}
+
+ public class MarshallerReceiver :
IDataStreamerReceiver<ReceiverItem<string>, ReceiverArg, ReceiverResult<string>>
+ {
+ public IMarshaller<ReceiverItem<string>> PayloadMarshaller => new
TestJsonMarshaller<ReceiverItem<string>>(new());
+
+ public IMarshaller<ReceiverArg> ArgumentMarshaller => new
TestJsonMarshaller<ReceiverArg>(new());
+
+ public IMarshaller<ReceiverResult<string>> ResultMarshaller => new
TestJsonMarshaller<ReceiverResult<string>>(new());
+
+ public async ValueTask<IList<ReceiverResult<string>>?> ReceiveAsync(
+ IList<ReceiverItem<string>> page,
+ ReceiverArg arg,
+ IDataStreamerReceiverContext context,
+ CancellationToken cancellationToken)
+ {
+ await Task.Yield();
+
+ return page.Select(x => new ReceiverResult<string>(x,
arg)).ToList();
+ }
+ }
+
+ public record ReceiverItem<T>(Guid Id, T Value);
+
+ public record ReceiverArg(int A, string B);
+
+ public record ReceiverResult<T>(ReceiverItem<T> Item, ReceiverArg Arg);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs
new file mode 100644
index 00000000000..760456ea084
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TestJsonMarshaller.cs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Buffers;
+using System.Text;
+using Ignite.Marshalling;
+
+/// <summary>
+/// Test JSON marshaller.
+/// </summary>
+/// <typeparam name="T">Element type.</typeparam>
+public class TestJsonMarshaller<T> : IMarshaller<T>
+{
+ private readonly JsonMarshaller<T> _marshaller;
+
+ public TestJsonMarshaller(JsonMarshaller<T> marshaller) => _marshaller =
marshaller;
+
+ public void Marshal(T obj, IBufferWriter<byte> writer) =>
_marshaller.Marshal(obj, writer);
+
+ public T Unmarshal(ReadOnlySpan<byte> bytes)
+ {
+ T res;
+
+ try
+ {
+ res = _marshaller.Unmarshal(bytes);
+ }
+ catch (Exception e)
+ {
+ throw new InvalidOperationException($"Failed to deserialize JSON:
'{Encoding.UTF8.GetString(bytes)}'", e);
+ }
+
+ if (res != null && res.ToString()!.Contains("error",
StringComparison.OrdinalIgnoreCase))
+ {
+ throw new InvalidOperationException("Test marshaller error: " +
res);
+ }
+
+ return res;
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index b5a92b01a29..f5e78261b68 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -33,6 +33,7 @@ using Common;
using Compute;
using Ignite.Compute;
using Ignite.Table;
+using Marshalling;
using Proto;
using Serialization;
@@ -63,6 +64,9 @@ internal static class DataStreamerWithReceiver
/// <param name="units">Deployment units. Can be empty.</param>
/// <param name="receiverClassName">Java class name of the streamer
receiver to execute on the server.</param>
/// <param name="receiverExecutionOptions">Receiver options.</param>
+ /// <param name="payloadMarshaller">Payload marshaller.</param>
+ /// <param name="argMarshaller">Argument marshaller.</param>
+ /// <param name="resultMarshaller">Result marshaller.</param>
/// <param name="receiverArg">Receiver arg.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <typeparam name="TSource">Source type.</typeparam>
@@ -84,6 +88,9 @@ internal static class DataStreamerWithReceiver
IEnumerable<DeploymentUnit> units,
string receiverClassName,
ReceiverExecutionOptions receiverExecutionOptions,
+ IMarshaller<TPayload>? payloadMarshaller,
+ IMarshaller<TArg>? argMarshaller,
+ IMarshaller<TResult>? resultMarshaller,
TArg receiverArg,
CancellationToken cancellationToken)
where TKey : notnull
@@ -293,14 +300,15 @@ internal static class DataStreamerWithReceiver
// Wait for the previous batch for this node to preserve item
order.
await oldTask.ConfigureAwait(false);
- (results, int resultsCount) = await SendBatchAsync<TResult>(
+ (results, int resultsCount) = await SendBatchAsync(
table,
buf,
count,
preferredNode,
retryPolicy,
expectResults: resultChannel != null,
- customReceiverExecutionOptions).ConfigureAwait(false);
+ customReceiverExecutionOptions,
+ resultMarshaller).ConfigureAwait(false);
if (results != null && resultChannel != null)
{
@@ -371,9 +379,9 @@ internal static class DataStreamerWithReceiver
}
}
- void SerializeBatch<T>(
+ void SerializeBatch(
PooledArrayBuffer buf,
- ArraySegment<T> items,
+ ArraySegment<TPayload> items,
int partitionId)
{
// T is one of the supported types (numbers, strings, etc).
@@ -386,7 +394,7 @@ internal static class DataStreamerWithReceiver
var expectResults = resultChannel != null;
w.Write(expectResults);
- StreamerReceiverSerializer.WriteReceiverInfo(ref w,
receiverClassName, receiverArg, items);
+ StreamerReceiverSerializer.WriteReceiverInfo(ref w,
receiverClassName, receiverArg, items, payloadMarshaller, argMarshaller);
w.Write(receiverExecutionOptions.Priority);
w.Write(receiverExecutionOptions.MaxRetries);
@@ -401,7 +409,8 @@ internal static class DataStreamerWithReceiver
PreferredNode preferredNode,
IRetryPolicy retryPolicy,
bool expectResults,
- bool customReceiverExecutionOptions)
+ bool customReceiverExecutionOptions,
+ IMarshaller<T>? marshaller)
{
var (resBuf, socket) = await table.Socket.DoWithRetryAsync(
(buf, customReceiverExecutionOptions),
@@ -431,7 +440,7 @@ internal static class DataStreamerWithReceiver
Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
return expectResults
- ?
StreamerReceiverSerializer.ReadReceiverResults<T>(resBuf.GetReader())
+ ?
StreamerReceiverSerializer.ReadReceiverResults(resBuf.GetReader(), marshaller)
: (null, 0);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 25eeac8f12c..fbc5efa279c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -337,7 +337,7 @@ namespace Apache.Ignite.Internal.Table
SingleWriter = false
});
- // Stream in background.
+ // Stream in the background.
var streamTask = Stream();
// Result async enumerable is returned immediately. It will be
completed when the streaming completes.
@@ -381,6 +381,9 @@ namespace Apache.Ignite.Internal.Table
receiver.DeploymentUnits ?? [],
receiver.ReceiverClassName,
receiver.Options ?? ReceiverExecutionOptions.Default,
+ receiver.PayloadMarshaller,
+ receiver.ArgumentMarshaller,
+ receiver.ResultMarshaller,
receiverArg,
cancellationToken).ConfigureAwait(false);
@@ -415,6 +418,9 @@ namespace Apache.Ignite.Internal.Table
receiver.DeploymentUnits ?? [],
receiver.ReceiverClassName,
receiver.Options ?? ReceiverExecutionOptions.Default,
+ null,
+ null,
+ null,
receiverArg,
cancellationToken).ConfigureAwait(false);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
index 5cb2f58d817..bd16a1d1e31 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/StreamerReceiverSerializer.cs
@@ -27,6 +27,7 @@ using Buffers;
using Compute;
using Ignite.Sql;
using Ignite.Table;
+using Marshalling;
using Proto.BinaryTuple;
using Proto.MsgPack;
@@ -43,14 +44,19 @@ internal static class StreamerReceiverSerializer
/// <param name="className">Receiver class name.</param>
/// <param name="arg">Receiver argument.</param>
/// <param name="items">Receiver items.</param>
+ /// <param name="payloadMarshaller">Payload marshaller.</param>
+ /// <param name="argMarshaller">Argument marshaller.</param>
/// <typeparam name="T">Item type.</typeparam>
- public static void WriteReceiverInfo<T>(
+ /// <typeparam name="TArg">Arg type.</typeparam>
+ public static void WriteReceiverInfo<T, TArg>(
ref MsgPackWriter w,
string className,
- object? arg,
- ArraySegment<T> items)
+ TArg arg,
+ ArraySegment<T> items,
+ IMarshaller<T>? payloadMarshaller,
+ IMarshaller<TArg>? argMarshaller)
{
- using var builder = BuildReceiverInfo(className, arg, items);
+ using var builder = BuildReceiverInfo(className, arg, items,
payloadMarshaller, argMarshaller);
w.Write(builder.NumElements);
w.Write(builder.Build().Span);
@@ -62,13 +68,18 @@ internal static class StreamerReceiverSerializer
/// <param name="className">Receiver class name.</param>
/// <param name="arg">Receiver argument.</param>
/// <param name="items">Receiver items.</param>
+ /// <param name="payloadMarshaller">Payload marshaller.</param>
+ /// <param name="argMarshaller">Argument marshaller.</param>
/// <param name="prefixSize">Builder prefix size.</param>
/// <typeparam name="T">Item type.</typeparam>
+ /// <typeparam name="TArg">Argument type.</typeparam>
/// <returns>Binary tuple builder.</returns>
- public static BinaryTupleBuilder BuildReceiverInfo<T>(
+ public static BinaryTupleBuilder BuildReceiverInfo<T, TArg>(
string className,
- object? arg,
+ TArg arg,
ArraySegment<T> items,
+ IMarshaller<T>? payloadMarshaller,
+ IMarshaller<TArg>? argMarshaller,
int prefixSize = 0)
{
Debug.Assert(items.Count > 0, "items.Count > 0");
@@ -81,7 +92,13 @@ internal static class StreamerReceiverSerializer
{
builder.AppendString(className);
- if (arg is IIgniteTuple tupleArg)
+ if (argMarshaller != null)
+ {
+ builder.AppendInt((int)ColumnType.ByteArray);
+ builder.AppendInt(0); // Scale.
+ builder.AppendBytes(static (bufWriter, a) =>
a.argMarshaller.Marshal(a.arg, bufWriter), (arg, argMarshaller));
+ }
+ else if (arg is IIgniteTuple tupleArg)
{
builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
builder.AppendInt(0); // Scale.
@@ -92,7 +109,7 @@ internal static class StreamerReceiverSerializer
builder.AppendObjectWithType(arg);
}
- AppendCollection(builder, items);
+ AppendCollection(builder, items, payloadMarshaller);
return builder;
}
@@ -108,8 +125,9 @@ internal static class StreamerReceiverSerializer
/// </summary>
/// <param name="w">Writer.</param>
/// <param name="res">Results.</param>
+ /// <param name="marshaller">Marshaller.</param>
/// <typeparam name="T">Result item type.</typeparam>
- public static void WriteReceiverResults<T>(MsgPackWriter w, IList<T>? res)
+ public static void WriteReceiverResults<T>(MsgPackWriter w, IList<T>? res,
IMarshaller<T>? marshaller)
{
if (res == null)
{
@@ -121,7 +139,7 @@ internal static class StreamerReceiverSerializer
// Reserve a 4-byte prefix for resTupleElementCount.
using var builder = new BinaryTupleBuilder(resTupleElementCount,
prefixSize: 4);
- AppendCollection(builder, res);
+ AppendCollection(builder, res, marshaller);
Memory<byte> jobResultTupleMemWithPrefix = builder.Build();
BinaryPrimitives.WriteInt32LittleEndian(jobResultTupleMemWithPrefix.Span,
resTupleElementCount);
@@ -132,9 +150,10 @@ internal static class StreamerReceiverSerializer
/// Reads receiver execution results. Opposite of <see
cref="WriteReceiverResults{T}"/>.
/// </summary>
/// <param name="reader">Reader.</param>
+ /// <param name="marshaller">Marshaller.</param>
/// <typeparam name="T">Result element type.</typeparam>
/// <returns>Pooled array with results and the actual element
count.</returns>
- public static (T[]? ResultsPooledArray, int ResultsCount)
ReadReceiverResults<T>(MsgPackReader reader)
+ public static (T[]? ResultsPooledArray, int ResultsCount)
ReadReceiverResults<T>(MsgPackReader reader, IMarshaller<T>? marshaller)
{
if (reader.TryReadNil())
{
@@ -148,7 +167,7 @@ internal static class StreamerReceiverSerializer
}
var tuple = new BinaryTupleReader(reader.ReadBinary(), numElements);
- if (tuple.GetInt(0) != TupleWithSchemaMarshalling.TypeIdTuple)
+ if (tuple.GetInt(0) != TupleWithSchemaMarshalling.TypeIdTuple &&
marshaller == null)
{
return tuple.GetObjectCollectionWithType<T>();
}
@@ -158,6 +177,16 @@ internal static class StreamerReceiverSerializer
try
{
+ if (marshaller != null)
+ {
+ for (var i = 0; i < elementCount; i++)
+ {
+ resultsPooledArr[i] =
marshaller.Unmarshal(tuple.GetBytesSpan(2 + i));
+ }
+
+ return (resultsPooledArr, elementCount);
+ }
+
for (var i = 0; i < elementCount; i++)
{
resultsPooledArr[i] =
(T)(object)TupleWithSchemaMarshalling.Unpack(tuple.GetBytesSpan(2 + i));
@@ -184,28 +213,41 @@ internal static class StreamerReceiverSerializer
/// Reads the receiver info from the buffer.
/// </summary>
/// <param name="buf">Buffer.</param>
+ /// <param name="payloadMarshaller">Payload marshaller.</param>
+ /// <param name="argumentMarshaller">Argument marshaller.</param>
/// <typeparam name="TItem">Item type.</typeparam>
/// <typeparam name="TArg">Argument type.</typeparam>
/// <returns>Receiver info.</returns>
- public static ReceiverInfo<TItem, TArg> ReadReceiverInfo<TItem,
TArg>(PooledBuffer buf)
+ public static ReceiverInfo<TItem, TArg> ReadReceiverInfo<TItem, TArg>(
+ PooledBuffer buf,
+ IMarshaller<TItem>? payloadMarshaller,
+ IMarshaller<TArg>? argumentMarshaller)
{
BinaryTupleReader receiverInfo = GetReceiverInfoReaderFast(buf);
- var arg = (TArg)ReadReceiverArg(ref receiverInfo, 1)!;
- List<TItem> items = ReadReceiverPage<TItem>(ref receiverInfo);
+ var arg = (TArg)ReadReceiverArg(ref receiverInfo, 1,
argumentMarshaller)!;
+ List<TItem> items = ReadReceiverPage(ref receiverInfo,
payloadMarshaller);
return new(items, arg);
}
[SuppressMessage("Design", "CA1002:Do not expose generic lists",
Justification = "Private method.")]
- private static List<T> ReadReceiverPage<T>(ref BinaryTupleReader
receiverInfo)
+ private static List<T> ReadReceiverPage<T>(ref BinaryTupleReader
receiverInfo, IMarshaller<T>? marshaller)
{
int itemType = receiverInfo.GetInt(4);
int itemCount = receiverInfo.GetInt(5);
List<T> items = new List<T>(itemCount);
- if (itemType == TupleWithSchemaMarshalling.TypeIdTuple)
+ if (marshaller != null)
+ {
+ for (int i = 0; i < itemCount; i++)
+ {
+ T item = marshaller.Unmarshal(receiverInfo.GetBytesSpan(i +
6));
+ items.Add(item);
+ }
+ }
+ else if (itemType == TupleWithSchemaMarshalling.TypeIdTuple)
{
for (int i = 0; i < itemCount; i++)
{
@@ -226,19 +268,25 @@ internal static class StreamerReceiverSerializer
return items;
}
- private static object? ReadReceiverArg(ref BinaryTupleReader reader, int
index)
+ private static TArg? ReadReceiverArg<TArg>(ref BinaryTupleReader reader,
int index, IMarshaller<TArg>? marshaller)
{
if (reader.IsNull(index))
{
- return null;
+ return default;
}
if (reader.GetInt(index) == TupleWithSchemaMarshalling.TypeIdTuple)
{
- return TupleWithSchemaMarshalling.Unpack(reader.GetBytesSpan(index
+ 2));
+ return
(TArg)(object)TupleWithSchemaMarshalling.Unpack(reader.GetBytesSpan(index + 2));
}
- return reader.GetObject(index);
+ if (marshaller != null)
+ {
+ ReadOnlySpan<byte> bytes = reader.GetBytesSpan(index + 2);
+ return marshaller.Unmarshal(bytes);
+ }
+
+ return (TArg?)reader.GetObject(index);
}
private static BinaryTupleReader GetReceiverInfoReaderFast(PooledBuffer
jobArgBuf)
@@ -270,9 +318,26 @@ internal static class StreamerReceiverSerializer
}
}
- private static void AppendCollection<T>(BinaryTupleBuilder builder,
IList<T> items)
+ private static void AppendMarshalledCollection<T>(BinaryTupleBuilder
builder, ICollection<T> items, IMarshaller<T> marshaller)
{
- if (items.Count > 0 && items[0] is IIgniteTuple)
+ builder.AppendInt((int)ColumnType.ByteArray);
+ builder.AppendInt(items.Count);
+
+ foreach (var item in items)
+ {
+ builder.AppendBytes(
+ static (bufWriter, arg) => arg.marsh.Marshal(arg.obj,
bufWriter),
+ (marsh: marshaller, obj: item));
+ }
+ }
+
+ private static void AppendCollection<T>(BinaryTupleBuilder builder,
IList<T> items, IMarshaller<T>? marshaller)
+ {
+ if (marshaller != null)
+ {
+ AppendMarshalledCollection(builder, items, marshaller);
+ }
+ else if (items.Count > 0 && items[0] is IIgniteTuple)
{
AppendTupleCollection(builder, items);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
index d48aa8673df..d00b029b462 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/StreamerReceiverExecutor/DataStreamerReceiverWrapper.cs
@@ -42,14 +42,16 @@ internal sealed class
DataStreamerReceiverWrapper<TReceiver, TItem, TArg, TResul
PooledArrayBuffer responseBuf,
CancellationToken cancellationToken)
{
- var (page, arg) = StreamerReceiverSerializer.ReadReceiverInfo<TItem,
TArg>(requestBuf);
TReceiver receiver = new TReceiver();
try
{
+ var (page, arg) = StreamerReceiverSerializer.ReadReceiverInfo(
+ requestBuf, receiver.PayloadMarshaller,
receiver.ArgumentMarshaller);
+
IList<TResult>? res = await receiver.ReceiveAsync(page, arg,
context, cancellationToken).ConfigureAwait(false);
-
StreamerReceiverSerializer.WriteReceiverResults(responseBuf.MessageWriter, res);
+
StreamerReceiverSerializer.WriteReceiverResults(responseBuf.MessageWriter, res,
receiver.ResultMarshaller);
}
finally
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
index fcd57f384bd..69b513c87e2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerReceiver.cs
@@ -21,6 +21,7 @@ using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
+using Marshalling;
/// <summary>
/// Data streamer receiver.
@@ -31,6 +32,21 @@ using System.Threading.Tasks;
[SuppressMessage("ReSharper", "TypeParameterCanBeVariant", Justification =
"Won't be possible later with marshallers.")]
public interface IDataStreamerReceiver<TItem, TArg, TResult>
{
+ /// <summary>
+ /// Gets the custom marshaller for the receiver payload.
+ /// </summary>
+ IMarshaller<TItem>? PayloadMarshaller => null;
+
+ /// <summary>
+ /// Gets the custom marshaller for the receiver input argument.
+ /// </summary>
+ IMarshaller<TArg>? ArgumentMarshaller => null;
+
+ /// <summary>
+ /// Gets the custom marshaller for the receiver result.
+ /// </summary>
+ IMarshaller<TResult>? ResultMarshaller => null;
+
/// <summary>
/// Receives a page of items from the data streamer.
/// <para />
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
index cfbaeb423bb..0b07f93810e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ReceiverDescriptor.cs
@@ -21,6 +21,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Compute;
+using Marshalling;
/// <summary>
/// Stream receiver descriptor without results. If the specified receiver
returns results, they will be discarded on the server.
@@ -75,6 +76,9 @@ public sealed record ReceiverDescriptor<TArg, TResult>(
/// <param name="ReceiverClassName">Name of the streamer receiver class to
execute.</param>
/// <param name="DeploymentUnits">Deployment units.</param>
/// <param name="Options">Execution options.</param>
+/// <param name="PayloadMarshaller">Payload marshaller.</param>
+/// <param name="ArgumentMarshaller">Argument marshaller.</param>
+/// <param name="ResultMarshaller">Result marshaller.</param>
/// <typeparam name="TItem">Streamer item type.</typeparam>
/// <typeparam name="TArg">Argument type.</typeparam>
/// <typeparam name="TResult">Result type.</typeparam>
@@ -82,7 +86,10 @@ public sealed record ReceiverDescriptor<TArg, TResult>(
public sealed record ReceiverDescriptor<TItem, TArg, TResult>(
string ReceiverClassName,
IEnumerable<DeploymentUnit>? DeploymentUnits = null,
- ReceiverExecutionOptions? Options = null)
+ ReceiverExecutionOptions? Options = null,
+ IMarshaller<TItem>? PayloadMarshaller = null,
+ IMarshaller<TArg>? ArgumentMarshaller = null,
+ IMarshaller<TResult>? ResultMarshaller = null)
{
/// <summary>
/// Initializes a new instance of the <see
cref="ReceiverDescriptor{TItem,TArg,TResult}"/> class.
@@ -90,14 +97,23 @@ public sealed record ReceiverDescriptor<TItem, TArg,
TResult>(
/// <param name="type">Receiver type.</param>
/// <param name="deploymentUnits">Deployment units.</param>
/// <param name="options">Options.</param>
+ /// <param name="payloadMarshaller">Payload marshaller.</param>
+ /// <param name="argumentMarshaller">Argument marshaller.</param>
+ /// <param name="resultMarshaller">Result marshaller.</param>
public ReceiverDescriptor(
Type type,
IEnumerable<DeploymentUnit>? deploymentUnits = null,
- ReceiverExecutionOptions? options = null)
+ ReceiverExecutionOptions? options = null,
+ IMarshaller<TItem>? payloadMarshaller = null,
+ IMarshaller<TArg>? argumentMarshaller = null,
+ IMarshaller<TResult>? resultMarshaller = null)
: this(
type.AssemblyQualifiedName ?? throw new ArgumentException("Type
has null AssemblyQualifiedName: " + type),
deploymentUnits,
- ReceiverDescriptor.EnsureDotNetExecutor(options))
+ ReceiverDescriptor.EnsureDotNetExecutor(options),
+ payloadMarshaller,
+ argumentMarshaller,
+ resultMarshaller)
{
// No-op.
}
@@ -117,7 +133,11 @@ public static class ReceiverDescriptor
/// <typeparam name="TResult">Result type.</typeparam>
/// <returns>Receiver descriptor.</returns>
public static ReceiverDescriptor<TItem, TArg, TResult> Of<TItem, TArg,
TResult>(IDataStreamerReceiver<TItem, TArg, TResult> receiver) =>
- new(receiver.GetType());
+ new(
+ receiver.GetType(),
+ payloadMarshaller: receiver.PayloadMarshaller,
+ argumentMarshaller: receiver.ArgumentMarshaller,
+ resultMarshaller: receiver.ResultMarshaller);
/// <summary>
/// Ensures that the provided <see cref="ReceiverExecutionOptions"/> is
set to use the .NET executor.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 5b63c15b7fa..ee741f8f9aa 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -858,6 +858,39 @@ public class PlatformTestNodeRunner {
}
}
+ @SuppressWarnings("unused") // Used by platform tests.
+ private static class MarshallerReceiver implements
DataStreamerReceiver<Nested, MyArg, MyResult> {
+ @Override
+ public @Nullable Marshaller<Nested, byte[]> payloadMarshaller() {
+ return new ToStringMarshaller();
+ }
+
+ @Override
+ public @Nullable Marshaller<MyArg, byte[]> argumentMarshaller() {
+ return new JsonMarshaller<>(MyArg.class);
+ }
+
+ @Override
+ public @Nullable Marshaller<MyResult, byte[]> resultMarshaller() {
+ return new JsonMarshaller<>(MyResult.class);
+ }
+
+ @Override
+ public CompletableFuture<List<MyResult>> receive(List<Nested> page,
DataStreamerReceiverContext ctx, MyArg arg) {
+ List<MyResult> results = new ArrayList<>(page.size());
+
+ for (Nested item : page) {
+ MyResult res = new MyResult();
+ res.data = arg.name + "_" + arg.id;
+ res.nested = item;
+
+ results.add(res);
+ }
+
+ return completedFuture(results);
+ }
+ }
+
@SuppressWarnings("unused") // Used by platform tests.
private static class PartitionJob implements ComputeJob<Long, Integer> {
@Override