This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 39478a01021 IGNITE-25343 Fix marshaller support in Data Streamer API 
(#5973)
39478a01021 is described below

commit 39478a010216f32c0c3cde5a935cc00a19e228b5
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Thu Jun 5 08:04:10 2025 +0300

    IGNITE-25343 Fix marshaller support in Data Streamer API (#5973)
    
    * Add marshallers to `DataStreamerReceiver`: `payloadMarshaller`, 
`argumentMarshaller`, `resultMarshaller`
    * Add `DataStreamerReceiverDescriptor<T, A, R>`:
      * Same generic type parameters as `DataStreamerReceiver<T, A, R>`
      * Has `payloadMarshaller`, `argumentMarshaller`, `resultMarshaller`
    * Deprecate `ReceiverDescriptor<A>`
    * Add new `DataStreamerTarget#streamData` with 
`DataStreamerReceiverDescriptor` and deprecate the old one
---
 .../apache/ignite/table/DataStreamerReceiver.java  |  32 +++-
 ...or.java => DataStreamerReceiverDescriptor.java} | 112 ++++++++++--
 .../apache/ignite/table/DataStreamerTarget.java    |  53 +++++-
 .../apache/ignite/table/ReceiverDescriptor.java    |   3 +
 .../table/DataStreamerReceiverDescriptorTest.java  |  70 ++++++++
 .../client/proto/StreamerReceiverSerializer.java   | 155 ++++++++++++----
 .../internal/client/table/ClientDataStreamer.java  |  12 +-
 .../client/table/ClientKeyValueBinaryView.java     |  11 +-
 .../internal/client/table/ClientKeyValueView.java  |  11 +-
 .../client/table/ClientRecordBinaryView.java       |  11 +-
 .../internal/client/table/ClientRecordView.java    |  10 +-
 .../client/table/api/PublicApiClientViewBase.java  |  16 +-
 .../ignite/client/fakes/FakeInternalTable.java     |  10 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |   9 +-
 .../compute/streamer/StreamerReceiverJob.java      |  16 +-
 .../streamer/ItAbstractDataStreamerTest.java       | 200 +++++++++++++++++++--
 .../internal/restart/RestartProofKeyValueView.java |  13 +-
 .../internal/restart/RestartProofRecordView.java   |  13 +-
 .../table/AsyncApiKeyValueViewAdapter.java         |   9 +-
 .../internal/table/AsyncApiRecordViewAdapter.java  |  10 +-
 .../internal/table/KeyValueBinaryViewImpl.java     |  10 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  10 +-
 .../internal/table/PublicApiThreadingViewBase.java |  16 +-
 .../internal/table/RecordBinaryViewImpl.java       |  10 +-
 .../ignite/internal/table/RecordViewImpl.java      |  12 +-
 .../internal/table/StreamerReceiverRunner.java     |   4 +-
 26 files changed, 664 insertions(+), 174 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java
index 899c67ec2ce..443181dd419 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiver.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.function.Function;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -34,8 +35,8 @@ import org.jetbrains.annotations.Nullable;
 @SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
 public interface DataStreamerReceiver<T, A, R> {
     /**
-     * Receives an item from the data streamer (see {@link 
DataStreamerTarget#streamData(Publisher, Function, Function,
-     * ReceiverDescriptor, Subscriber, DataStreamerOptions, Object)}).
+     * Receives an item from the data streamer (see {@link 
DataStreamerTarget#streamData(Publisher, DataStreamerReceiverDescriptor,
+     * Function, Function, Object, Subscriber, DataStreamerOptions)}).
      *
      * <p>The receiver is called for each page (batch) in the data streamer 
and is responsible for processing the items,
      * updating zero or more tables, and returning a result.
@@ -49,4 +50,31 @@ public interface DataStreamerReceiver<T, A, R> {
             List<T> page,
             DataStreamerReceiverContext ctx,
             @Nullable A arg);
+
+    /**
+     * Marshaller for the receiver payload (batch items).
+     *
+     * @return Payload marshaller.
+     */
+    default @Nullable Marshaller<T, byte[]> payloadMarshaller() {
+        return null;
+    }
+
+    /**
+     * Marshaller for the input argument.
+     *
+     * @return Input marshaller.
+     */
+    default @Nullable Marshaller<A, byte[]> argumentMarshaller() {
+        return null;
+    }
+
+    /**
+     * Marshaller for the job result.
+     *
+     * @return Result marshaller.
+     */
+    default @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        return null;
+    }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java
similarity index 53%
copy from 
modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java
copy to 
modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java
index fb9e4f8821e..ff0077049e3 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerReceiverDescriptor.java
@@ -26,20 +26,26 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Data streamer receiver descriptor.
  */
-public class ReceiverDescriptor<A> {
+public class DataStreamerReceiverDescriptor<T, A, R> {
     private final String receiverClassName;
 
     private final List<DeploymentUnit> units;
 
     private final ReceiverExecutionOptions options;
 
+    private final @Nullable Marshaller<T, byte[]> payloadMarshaller;
+
     private final @Nullable Marshaller<A, byte[]> argumentMarshaller;
 
-    private ReceiverDescriptor(
+    private final @Nullable Marshaller<R, byte[]> resultMarshaller;
+
+    private DataStreamerReceiverDescriptor(
             String receiverClassName,
             List<DeploymentUnit> units,
             ReceiverExecutionOptions options,
-            @Nullable Marshaller<A, byte[]> argumentMarshaller
+            @Nullable Marshaller<T, byte[]> payloadMarshaller,
+            @Nullable Marshaller<A, byte[]> argumentMarshaller,
+            @Nullable Marshaller<R, byte[]> resultMarshaller
     ) {
         Objects.requireNonNull(receiverClassName);
         Objects.requireNonNull(units);
@@ -47,7 +53,9 @@ public class ReceiverDescriptor<A> {
         this.receiverClassName = receiverClassName;
         this.units = units;
         this.options = options;
+        this.payloadMarshaller = payloadMarshaller;
         this.argumentMarshaller = argumentMarshaller;
+        this.resultMarshaller = resultMarshaller;
     }
 
     /**
@@ -77,12 +85,39 @@ public class ReceiverDescriptor<A> {
         return options;
     }
 
+    /**
+     * Payload marshaller.
+     *
+     * @return Payload marshaller.
+     */
+    public @Nullable Marshaller<T, byte[]> payloadMarshaller() {
+        return payloadMarshaller;
+    }
+
+    /**
+     * Argument marshaller.
+     *
+     * @return Argument marshaller.
+     */
+    public @Nullable Marshaller<A, byte[]> argumentMarshaller() {
+        return argumentMarshaller;
+    }
+
+    /**
+     * Result marshaller.
+     *
+     * @return Result marshaller.
+     */
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        return resultMarshaller;
+    }
+
     /**
      * Create a new builder.
      *
      * @return Receiver descriptor builder.
      */
-    public static <A> Builder<A> builder(String receiverClassName) {
+    public static <T, A, R> Builder<T, A, R> builder(String receiverClassName) 
{
         Objects.requireNonNull(receiverClassName);
 
         return new Builder<>(receiverClassName);
@@ -93,23 +128,36 @@ public class ReceiverDescriptor<A> {
      *
      * @return Receiver descriptor builder.
      */
-    public static <A> Builder<A> builder(Class<? extends 
DataStreamerReceiver<?, A, ?>> receiverClass) {
+    public static <T, A, R> Builder<T, A, R> builder(Class<? extends 
DataStreamerReceiver<T, A, R>> receiverClass) {
         Objects.requireNonNull(receiverClass);
 
         return new Builder<>(receiverClass.getName());
     }
 
-    public @Nullable Marshaller<A, byte[]> argumentMarshaller() {
-        return argumentMarshaller;
+    /**
+     * Create a new builder from a receiver instance.
+     * Populates {@link #payloadMarshaller()}, {@link #argumentMarshaller()}, 
and {@link #resultMarshaller()} from the provided receiver.
+     *
+     * @return Receiver descriptor builder.
+     */
+    public static <T, A, R> Builder<T, A, R> builder(DataStreamerReceiver<T, 
A, R> receiver) {
+        Objects.requireNonNull(receiver);
+
+        return new Builder<T, A, R>(receiver.getClass().getName())
+                .payloadMarshaller(receiver.payloadMarshaller())
+                .argumentMarshaller(receiver.argumentMarshaller())
+                .resultMarshaller(receiver.resultMarshaller());
     }
 
     /**
      * Builder.
      */
-    public static class Builder<A> {
+    public static class Builder<T, A, R> {
         private final String receiverClassName;
         private List<DeploymentUnit> units;
+        private @Nullable Marshaller<T, byte[]> payloadMarshaller;
         private @Nullable Marshaller<A, byte[]> argumentMarshaller;
+        private @Nullable Marshaller<R, byte[]> resultMarshaller;
         private ReceiverExecutionOptions options = 
ReceiverExecutionOptions.DEFAULT;
 
         private Builder(String receiverClassName) {
@@ -124,7 +172,7 @@ public class ReceiverDescriptor<A> {
          * @param units Deployment units.
          * @return This builder.
          */
-        public Builder<A> units(List<DeploymentUnit> units) {
+        public Builder<T, A, R> units(List<DeploymentUnit> units) {
             this.units = units;
             return this;
         }
@@ -135,7 +183,7 @@ public class ReceiverDescriptor<A> {
          * @param units Deployment units.
          * @return This builder.
          */
-        public Builder<A> units(DeploymentUnit... units) {
+        public Builder<T, A, R> units(DeploymentUnit... units) {
             this.units = List.of(units);
             return this;
         }
@@ -146,13 +194,41 @@ public class ReceiverDescriptor<A> {
          * @param options Receiver execution options.
          * @return This builder.
          */
-        public Builder<A> options(ReceiverExecutionOptions options) {
+        public Builder<T, A, R> options(ReceiverExecutionOptions options) {
             this.options = options;
             return this;
         }
 
-        public Builder<A> argumentMarshaller(@Nullable Marshaller<A, byte[]> 
argumentsMarshaller) {
-            this.argumentMarshaller = argumentsMarshaller;
+        /**
+         * Sets the payload marshaller.
+         *
+         * @param payloadMarshaller Payload marshaller.
+         * @return This builder.
+         */
+        public Builder<T, A, R> payloadMarshaller(@Nullable Marshaller<T, 
byte[]> payloadMarshaller) {
+            this.payloadMarshaller = payloadMarshaller;
+            return this;
+        }
+
+        /**
+         * Sets the argument marshaller.
+         *
+         * @param argumentMarshaller Argument marshaller.
+         * @return This builder.
+         */
+        public Builder<T, A, R> argumentMarshaller(@Nullable Marshaller<A, 
byte[]> argumentMarshaller) {
+            this.argumentMarshaller = argumentMarshaller;
+            return this;
+        }
+
+        /**
+         * Sets the result marshaller.
+         *
+         * @param resultMarshaller Result marshaller.
+         * @return This builder.
+         */
+        public Builder<T, A, R> resultMarshaller(@Nullable Marshaller<R, 
byte[]> resultMarshaller) {
+            this.resultMarshaller = resultMarshaller;
             return this;
         }
 
@@ -161,12 +237,14 @@ public class ReceiverDescriptor<A> {
          *
          * @return Receiver descriptor.
          */
-        public ReceiverDescriptor<A> build() {
-            return new ReceiverDescriptor<>(
+        public DataStreamerReceiverDescriptor<T, A, R> build() {
+            return new DataStreamerReceiverDescriptor<>(
                     receiverClassName,
-                    units == null ? List.of() : units,
+                    units != null ? units : List.of(),
                     options,
-                    argumentMarshaller
+                    payloadMarshaller,
+                    argumentMarshaller,
+                    resultMarshaller
             );
         }
     }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java 
b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java
index c7f927965ff..df429e206a3 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/DataStreamerTarget.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.table;
 
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow;
 import java.util.function.Function;
@@ -50,18 +51,64 @@ public interface DataStreamerTarget<T> {
      *     from {@link Flow.Subscription#request(long)} calls.
      * @param options Options (can be null).
      * @param receiverArg Receiver arguments.
-     * @return Future that will be completed when the stream is finished.
      * @param <E> Producer item type.
      * @param <V> Payload type.
      * @param <R> Result type.
      * @param <A> Receiver job argument type.
+     * @return Future that will be completed when the stream is finished.
+     * @deprecated Use {@link #streamData(Flow.Publisher, 
DataStreamerReceiverDescriptor, Function, Function, Object, Flow.Subscriber,
+     * DataStreamerOptions)}.
      */
-    <E, V, R, A> CompletableFuture<Void> streamData(
+    @Deprecated
+    default <E, V, R, A> CompletableFuture<Void> streamData(
             Flow.Publisher<E> publisher,
             Function<E, T> keyFunc,
             Function<E, V> payloadFunc,
             ReceiverDescriptor<A> receiver,
             @Nullable Flow.Subscriber<R> resultSubscriber,
             @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg);
+            @Nullable A receiverArg) {
+        Objects.requireNonNull(receiver);
+
+        DataStreamerReceiverDescriptor<V, A, R> desc = 
DataStreamerReceiverDescriptor
+                .<V, A, R>builder(receiver.receiverClassName())
+                .units(receiver.units())
+                .options(receiver.options())
+                .build();
+
+        return streamData(
+                publisher,
+                desc,
+                keyFunc,
+                payloadFunc,
+                receiverArg,
+                resultSubscriber,
+                options);
+    }
+
+    /**
+     * Streams data with receiver. The receiver is responsible for processing 
the data and updating zero or more tables.
+     *
+     * @param publisher Producer.
+     * @param keyFunc Key function. The key is only used locally for 
colocation.
+     * @param payloadFunc Payload function. The payload is sent to the 
receiver.
+     * @param resultSubscriber Optional subscriber for the receiver results.
+     *     NOTE: The result subscriber follows the pace of publisher and 
ignores backpressure
+     *     from {@link Flow.Subscription#request(long)} calls.
+     * @param options Options (can be null).
+     * @param receiverArg Receiver arguments.
+     * @return Future that will be completed when the stream is finished.
+     * @param <E> Producer item type.
+     * @param <V> Payload type.
+     * @param <A> Receiver job argument type.
+     * @param <R> Result type.
+     */
+    <E, V, A, R> CompletableFuture<Void> streamData(
+            Flow.Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
+            Function<E, T> keyFunc,
+            Function<E, V> payloadFunc,
+            @Nullable A receiverArg,
+            @Nullable Flow.Subscriber<R> resultSubscriber,
+            @Nullable DataStreamerOptions options);
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java 
b/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java
index fb9e4f8821e..44c6afed64c 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/ReceiverDescriptor.java
@@ -25,7 +25,10 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Data streamer receiver descriptor.
+ *
+ * @deprecated Replaced by {@link DataStreamerReceiverDescriptor}.
  */
+@Deprecated
 public class ReceiverDescriptor<A> {
     private final String receiverClassName;
 
diff --git 
a/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java
 
b/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java
new file mode 100644
index 00000000000..c4701a2b84a
--- /dev/null
+++ 
b/modules/api/src/test/java/org/apache/ignite/table/DataStreamerReceiverDescriptorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.marshalling.ByteArrayMarshaller;
+import org.apache.ignite.marshalling.Marshaller;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Data streamer receiver descriptor test.
+ */
+public class DataStreamerReceiverDescriptorTest {
+    @Test
+    void testReceiverDescriptorBuilderPopulatesMarshallers() {
+        DataStreamerReceiverDescriptor<Integer, String, UUID> desc = 
DataStreamerReceiverDescriptor.builder(new TestReceiver()).build();
+
+        assertInstanceOf(IntMarshaller.class, desc.payloadMarshaller());
+        assertInstanceOf(StrMarshaller.class, desc.argumentMarshaller());
+        assertInstanceOf(UuidMarshaller.class, desc.resultMarshaller());
+    }
+
+    private static class IntMarshaller implements ByteArrayMarshaller<Integer> 
{ }
+
+    private static class StrMarshaller implements ByteArrayMarshaller<String> 
{ }
+
+    private static class UuidMarshaller implements ByteArrayMarshaller<UUID> { 
}
+
+    private static class TestReceiver implements DataStreamerReceiver<Integer, 
String, UUID> {
+        @Override
+        public @Nullable CompletableFuture<List<UUID>> receive(List<Integer> 
page, DataStreamerReceiverContext ctx, @Nullable String arg) {
+            return null;
+        }
+
+        @Override
+        public @Nullable Marshaller<Integer, byte[]> payloadMarshaller() {
+            return new IntMarshaller();
+        }
+
+        @Override
+        public @Nullable Marshaller<String, byte[]> argumentMarshaller() {
+            return new StrMarshaller();
+        }
+
+        @Override
+        public @Nullable Marshaller<UUID, byte[]> resultMarshaller() {
+            return new UuidMarshaller();
+        }
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
index 621831854dd..771ffc0d1d6 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.proto;
 
 import static 
org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils.unsupportedTypeException;
 import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Compute.MARSHALLING_TYPE_MISMATCH_ERR;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -40,9 +41,11 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import 
org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiver;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
 
@@ -64,19 +67,20 @@ public class StreamerReceiverSerializer {
      * @param receiverArg Receiver arguments.
      * @param items Items.
      */
-    public static <A> void serializeReceiverInfoOnClient(
+    public static <T, A> void serializeReceiverInfoOnClient(
             ClientMessagePacker w,
             String receiverClassName,
-            A receiverArg,
-            @Nullable Marshaller<A, byte[]> receiverArgsMarshaller,
-            Collection<?> items) {
+            @Nullable A receiverArg,
+            @Nullable Marshaller<T, byte[]> itemsMarshaller,
+            @Nullable Marshaller<A, byte[]> receiverArgMarshaller,
+            Collection<T> items) {
         // className + arg + items size + item type + items.
         int binaryTupleSize = 1 + 3 + 1 + 1 + items.size();
         var builder = new BinaryTupleBuilder(binaryTupleSize);
         builder.appendString(receiverClassName);
 
-        appendArg(builder, receiverArg);
-        appendCollectionToBinaryTuple(builder, items);
+        appendArg(builder, receiverArg, receiverArgMarshaller);
+        appendCollectionToBinaryTuple(builder, items, itemsMarshaller);
 
         w.packInt(binaryTupleSize);
         w.packBinaryTuple(builder);
@@ -89,17 +93,19 @@ public class StreamerReceiverSerializer {
      * @param receiverArg Receiver arguments.
      * @param items Items.
      */
-    public static <A> byte[] serializeReceiverInfoWithElementCount(
-            ReceiverDescriptor<A> receiver,
+    public static <T, A, R> byte[] serializeReceiverInfoWithElementCount(
+            DataStreamerReceiverDescriptor<T, A, R> receiver,
             @Nullable A receiverArg,
-            Collection<?> items) {
+            @Nullable Marshaller<T, byte[]> itemsMarshaller,
+            @Nullable Marshaller<A, byte[]> receiverArgMarshaller,
+            Collection<T> items) {
         // className + arg + items size + item type + items.
         int binaryTupleSize = 1 + 3 + 1 + 1 + items.size();
         var builder = new BinaryTupleBuilder(binaryTupleSize);
         builder.appendString(receiver.receiverClassName());
 
-        appendArg(builder, receiverArg);
-        appendCollectionToBinaryTuple(builder, items);
+        appendArg(builder, receiverArg, receiverArgMarshaller);
+        appendCollectionToBinaryTuple(builder, items, itemsMarshaller);
 
         ByteBuffer buf = builder.build();
         int bufSize = buf.limit() - buf.position();
@@ -116,9 +122,13 @@ public class StreamerReceiverSerializer {
      *
      * @param bytes Bytes.
      * @param elementCount Number of elements in the binary tuple.
+     * @param receiverFactory Function to create a receiver instance from its 
class name.
      * @return Streamer receiver info.
      */
-    public static SteamerReceiverInfo deserializeReceiverInfo(ByteBuffer 
bytes, int elementCount) {
+    public static SteamerReceiverInfo deserializeReceiverInfo(
+            ByteBuffer bytes,
+            int elementCount,
+            Function<String, DataStreamerReceiver<Object, Object, Object>> 
receiverFactory) {
         var reader = new BinaryTupleReader(elementCount, bytes);
 
         int readerIndex = 0;
@@ -128,13 +138,15 @@ public class StreamerReceiverSerializer {
             throw new IgniteException(PROTOCOL_ERR, "Receiver class name is 
null");
         }
 
-        Object receiverArg = readArg(reader, readerIndex);
+        DataStreamerReceiver<Object, Object, Object> receiver = 
receiverFactory.apply(receiverClassName);
+
+        Object receiverArg = readArg(reader, readerIndex, 
receiver.argumentMarshaller());
 
         readerIndex += 3;
 
-        List<Object> items = readCollectionFromBinaryTuple(reader, 
readerIndex);
+        List<Object> items = readCollectionFromBinaryTuple(reader, 
readerIndex, receiver.payloadMarshaller());
 
-        return new SteamerReceiverInfo(receiverClassName, receiverArg, items);
+        return new SteamerReceiverInfo(receiver, receiverArg, items);
     }
 
     /**
@@ -142,14 +154,16 @@ public class StreamerReceiverSerializer {
      *
      * @param receiverResults Receiver results.
      */
-    public static byte @Nullable [] serializeReceiverJobResults(@Nullable 
List<Object> receiverResults) {
+    public static <T> byte @Nullable [] serializeReceiverJobResults(
+            @Nullable List<T> receiverResults,
+            @Nullable Marshaller<T, byte[]> resultsMarshaller) {
         if (receiverResults == null || receiverResults.isEmpty()) {
             return null;
         }
 
         int numElements = 2 + receiverResults.size();
         var builder = new BinaryTupleBuilder(numElements);
-        appendCollectionToBinaryTuple(builder, receiverResults);
+        appendCollectionToBinaryTuple(builder, receiverResults, 
resultsMarshaller);
 
         ByteBuffer res = builder.build();
 
@@ -172,7 +186,9 @@ public class StreamerReceiverSerializer {
      * @param results Serialized results.
      * @return Deserialized results.
      */
-    public static <R> List<R> deserializeReceiverJobResults(byte[] results) {
+    public static <R> List<R> deserializeReceiverJobResults(
+            byte[] results,
+            @Nullable Marshaller<R, byte[]> resultsMarshaller) {
         if (results == null || results.length == 0) {
             return List.of();
         }
@@ -182,7 +198,7 @@ public class StreamerReceiverSerializer {
 
         var reader = new BinaryTupleReader(numElements, 
buf.slice().order(ByteOrder.LITTLE_ENDIAN));
 
-        return readCollectionFromBinaryTuple(reader, 0);
+        return readCollectionFromBinaryTuple(reader, 0, resultsMarshaller);
     }
 
     /**
@@ -213,7 +229,9 @@ public class StreamerReceiverSerializer {
      * @param r Reader.
      * @return Receiver results.
      */
-    public static @Nullable <R> List<R> 
deserializeReceiverResultsOnClient(ClientMessageUnpacker r) {
+    public static @Nullable <R> List<R> deserializeReceiverResultsOnClient(
+            ClientMessageUnpacker r,
+            @Nullable Marshaller<R, byte[]> resultsMarshaller) {
         if (r.tryUnpackNil()) {
             return null;
         }
@@ -222,7 +240,7 @@ public class StreamerReceiverSerializer {
         byte[] bytes = r.readBinary();
         var reader = new BinaryTupleReader(numElements, bytes);
 
-        return readCollectionFromBinaryTuple(reader, 0);
+        return readCollectionFromBinaryTuple(reader, 0, resultsMarshaller);
     }
 
     /**
@@ -231,11 +249,26 @@ public class StreamerReceiverSerializer {
      * @param builder Target builder.
      * @param items Items.
      */
-    private static <T> void appendCollectionToBinaryTuple(BinaryTupleBuilder 
builder, Collection<T> items) {
+    private static <T> void appendCollectionToBinaryTuple(
+            BinaryTupleBuilder builder,
+            Collection<T> items,
+            @Nullable Marshaller<T, byte[]> itemsMarshaller) {
         assert items != null : "items can't be null";
         assert !items.isEmpty() : "items can't be empty";
         assert builder != null : "builder can't be null";
 
+        if (itemsMarshaller != null) {
+            builder.appendInt(ColumnType.BYTE_ARRAY.id());
+            builder.appendInt(items.size());
+
+            for (T item : items) {
+                byte[] bytes = itemsMarshaller.marshal(item);
+                builder.appendBytes(bytes);
+            }
+
+            return;
+        }
+
         T firstItem = items.iterator().next();
         Objects.requireNonNull(firstItem);
         Class<?> type = firstItem.getClass();
@@ -254,14 +287,23 @@ public class StreamerReceiverSerializer {
         }
     }
 
-    private static <R> List<R> readCollectionFromBinaryTuple(BinaryTupleReader 
reader, int readerIndex) {
+    private static <R> List<R> readCollectionFromBinaryTuple(
+            BinaryTupleReader reader,
+            int readerIndex,
+            @Nullable Marshaller<R, byte[]> itemsMarshaller) {
         int typeId = reader.intValue(readerIndex++);
         Function<Integer, Object> itemReader = readerForType(reader, typeId);
         int itemsCount = reader.intValue(readerIndex++);
 
         List<R> items = new ArrayList<>(itemsCount);
         for (int i = 0; i < itemsCount; i++) {
-            items.add((R) itemReader.apply(readerIndex++));
+            Object itemRaw = itemReader.apply(readerIndex++);
+
+            R item = itemsMarshaller == null
+                    ? (R) itemRaw
+                    : unmarshalBytes(itemsMarshaller, itemRaw);
+
+            items.add(item);
         }
 
         return items;
@@ -399,7 +441,16 @@ public class StreamerReceiverSerializer {
         }
     }
 
-    private static <T> void appendArg(BinaryTupleBuilder builder, @Nullable T 
arg) {
+    private static <T> void appendArg(
+            BinaryTupleBuilder builder,
+            @Nullable T arg,
+            @Nullable Marshaller<T, byte[]> receiverArgMarshaller) {
+        if (receiverArgMarshaller != null) {
+            byte[] bytes = receiverArgMarshaller.marshal(arg);
+            ClientBinaryTupleUtils.appendObject(builder, bytes);
+            return;
+        }
+
         if (arg instanceof Tuple) {
             builder.appendInt(TupleWithSchemaMarshalling.TYPE_ID_TUPLE);
             builder.appendInt(0); // Scale.
@@ -411,18 +462,50 @@ public class StreamerReceiverSerializer {
         ClientBinaryTupleUtils.appendObject(builder, arg);
     }
 
-    private static @Nullable Object readArg(BinaryTupleReader reader, int 
index) {
+    private static @Nullable Object readArg(
+            BinaryTupleReader reader,
+            int index,
+            @Nullable Marshaller<Object, byte[]> receiverArgMarshaller) {
         if (reader.hasNullValue(index)) {
-            return null;
+            return receiverArgMarshaller == null
+                    ? null
+                    : receiverArgMarshaller.unmarshal(null);
         }
 
         if (reader.intValue(index) == 
TupleWithSchemaMarshalling.TYPE_ID_TUPLE) {
             return readTuple(reader, index + 2);
         }
 
-        return ClientBinaryTupleUtils.readObject(reader, index);
+        var obj = ClientBinaryTupleUtils.readObject(reader, index);
+
+        return receiverArgMarshaller == null
+                ? obj
+                : unmarshalBytes(receiverArgMarshaller, obj);
     }
 
+    private static <T> @Nullable T unmarshalBytes(Marshaller<T, byte[]> 
marshaller, @Nullable Object input) {
+        try {
+            if (input instanceof byte[]) {
+                return marshaller.unmarshal((byte[]) input);
+            } else if (input == null) {
+                return marshaller.unmarshal(null);
+            }
+        } catch (Exception ex) {
+            throw new MarshallerException(
+                    UUID.randomUUID(), MARSHALLING_TYPE_MISMATCH_ERR, 
"Exception in user-defined marshaller: " + ex.getMessage(), ex);
+        }
+
+        throw new MarshallerException(
+                UUID.randomUUID(),
+                MARSHALLING_TYPE_MISMATCH_ERR,
+                "Marshaller is defined in the DataStreamerReceiver 
implementation, "
+                        + "expected argument type: `byte[]`, actual: `" + 
input.getClass() + "`. "
+                        + "Ensure that DataStreamerReceiverDescriptor 
marshallers match DataStreamerReceiver marshallers.",
+                null
+        );
+    }
+
+
     private static <T> void appendTuple(BinaryTupleBuilder builder, Tuple arg) 
{
         builder.appendBytes(TupleWithSchemaMarshalling.marshal(arg));
     }
@@ -436,23 +519,23 @@ public class StreamerReceiverSerializer {
      * Streamer receiver info.
      */
     public static class SteamerReceiverInfo {
-        private final String className;
+        private final DataStreamerReceiver<Object, Object, Object> receiver;
         private final @Nullable Object arg;
         private final List<Object> items;
 
-        private SteamerReceiverInfo(String className, @Nullable Object arg, 
List<Object> items) {
-            this.className = className;
+        private SteamerReceiverInfo(DataStreamerReceiver<Object, Object, 
Object> receiver, @Nullable Object arg, List<Object> items) {
+            this.receiver = receiver;
             this.arg = arg;
             this.items = items;
         }
 
         /**
-         * Gets receiver class name.
+         * Get receiver instance.
          *
-         * @return Receiver class name.
+         * @return Receiver instance.
          */
-        public String className() {
-            return className;
+        public DataStreamerReceiver<Object, Object, Object> receiver() {
+            return receiver;
         }
 
         /**
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index a4300b7f014..b0af543f8df 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.streamer.StreamerSubscriber;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOperationType;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.ReceiverExecutionOptions;
 import org.jetbrains.annotations.Nullable;
 
@@ -71,8 +71,8 @@ class ClientDataStreamer {
             StreamerPartitionAwarenessProvider<T, Integer> 
partitionAwarenessProvider,
             ClientTable tbl,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            ReceiverDescriptor<A> receiverDescriptor,
-            A receiverArgs
+            DataStreamerReceiverDescriptor<V, A, R> receiverDescriptor,
+            @Nullable A receiverArg
     ) {
         var opts = receiverDescriptor.options();
         if (opts != null && !opts.equals(ReceiverExecutionOptions.DEFAULT)) {
@@ -96,12 +96,14 @@ class ClientDataStreamer {
                                     
StreamerReceiverSerializer.serializeReceiverInfoOnClient(
                                             w,
                                             
receiverDescriptor.receiverClassName(),
-                                            receiverArgs,
+                                            receiverArg,
+                                            
receiverDescriptor.payloadMarshaller(),
                                             
receiverDescriptor.argumentMarshaller(),
                                             items);
                                 },
                                 in -> resultSubscriber != null
-                                        ? 
StreamerReceiverSerializer.deserializeReceiverResultsOnClient(in.in())
+                                        ? 
StreamerReceiverSerializer.deserializeReceiverResultsOnClient(
+                                                in.in(), 
receiverDescriptor.resultMarshaller())
                                         : null,
                                 partitionAssignment.get(partitionId),
                                 new 
RetryLimitPolicy().retryLimit(options.retryLimit()),
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
index b253a15a14a..799640dfe1d 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
@@ -44,8 +44,8 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -538,16 +538,15 @@ public class ClientKeyValueBinaryView extends 
AbstractClientView<Entry<Tuple, Tu
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, Entry<Tuple, Tuple>> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 165d8cc1917..81356fd7132 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -62,8 +62,8 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -727,16 +727,15 @@ public class ClientKeyValueView<K, V> extends 
AbstractClientView<Entry<K, V>> im
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public <E, P, R, A> CompletableFuture<Void> streamData(
+    public <E, P, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<P, A, R> receiver,
             Function<E, Entry<K, V>> keyFunc,
             Function<E, P> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index ef7f75c5951..b7de009fb6c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
@@ -453,16 +453,15 @@ public class ClientRecordBinaryView extends 
AbstractClientView<Tuple> implements
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
 
-    /** {@inheritDoc} */
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, Tuple> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
index c0b0484f965..630e679e410 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
@@ -44,7 +44,7 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
@@ -457,14 +457,14 @@ public class ClientRecordView<R> extends 
AbstractClientView<R> implements Record
     }
 
     @Override
-    public <E, V, R1, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R1> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R1> receiver,
             Function<E, R> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R1> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
index d9e2052b3bf..97bc391a935 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
@@ -27,8 +27,8 @@ import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.DataStreamerTarget;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.criteria.CriteriaQuerySource;
@@ -53,22 +53,22 @@ abstract class PublicApiClientViewBase<T> implements 
DataStreamerTarget<T>, Crit
     }
 
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, T> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         return executeAsyncOp(() -> streamerTarget.streamData(
                 publisher,
+                receiver,
                 keyFunc,
                 payloadFunc,
-                receiver,
+                receiverArg,
                 resultSubscriber,
-                options,
-                receiverArg));
+                options));
     }
 
     @Override
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index e96bb607415..a7c7967fbfa 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -61,8 +61,8 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.QualifiedName;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.ReceiverExecutionOptions;
 import org.jetbrains.annotations.Nullable;
 
@@ -551,8 +551,12 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     }
 
     @Override
-    public <A, I, R> CompletableFuture<Collection<R>> 
runReceiverAsync(ReceiverDescriptor<A> receiver, @Nullable A receiverArg,
-            Collection<I> items, ClusterNode node, List<DeploymentUnit> 
deploymentUnits) {
+    public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(
+            DataStreamerReceiverDescriptor<I, A, R> receiver,
+            @Nullable A receiverArg,
+            Collection<I> items,
+            ClusterNode node,
+            List<DeploymentUnit> deploymentUnits) {
         throw new UnsupportedOperationException("Not implemented");
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 8baa97a8209..fbfe2a89423 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -83,8 +83,8 @@ import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.QualifiedName;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.ReceiverExecutionOptions;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -587,15 +587,16 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
 
     @Override
     public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(
-            ReceiverDescriptor<A> receiver,
+            DataStreamerReceiverDescriptor<I, A, R> receiver,
             @Nullable A receiverArg,
             Collection<I> items,
             ClusterNode node,
             List<DeploymentUnit> deploymentUnits) {
-        var payload = 
StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(receiver, 
receiverArg, items);
+        var payload = 
StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(
+                receiver, receiverArg, receiver.payloadMarshaller(), 
receiver.argumentMarshaller(), items);
 
         return runReceiverAsync(payload, node, deploymentUnits, 
receiver.options())
-                
.thenApply(StreamerReceiverSerializer::deserializeReceiverJobResults);
+                .thenApply(r -> 
StreamerReceiverSerializer.deserializeReceiverJobResults(r, 
receiver.resultMarshaller()));
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java
index 3c77c8a1406..fb34d8ec4cd 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/streamer/StreamerReceiverJob.java
@@ -44,21 +44,25 @@ public class StreamerReceiverJob implements 
ComputeJob<byte[], byte[]> {
 
         SteamerReceiverInfo receiverInfo = 
StreamerReceiverSerializer.deserializeReceiverInfo(
                 buf.slice().order(ByteOrder.LITTLE_ENDIAN),
-                payloadElementCount);
+                payloadElementCount,
+                receiverClassName -> {
+                    ClassLoader classLoader = ((JobExecutionContextImpl) 
context).classLoader();
+                    Class<DataStreamerReceiver<Object, Object, Object>> 
receiverClass = ComputeUtils.receiverClass(
+                            classLoader, receiverClassName);
 
-        ClassLoader classLoader = ((JobExecutionContextImpl) 
context).classLoader();
-        Class<DataStreamerReceiver<Object, Object, Object>> receiverClass = 
ComputeUtils.receiverClass(
-                classLoader, receiverInfo.className());
+                    return ComputeUtils.instantiateReceiver(receiverClass);
+                });
 
-        DataStreamerReceiver<Object, Object, Object> receiver = 
ComputeUtils.instantiateReceiver(receiverClass);
         DataStreamerReceiverContext receiverContext = context::ignite;
 
+        DataStreamerReceiver<Object, Object, Object> receiver = 
receiverInfo.receiver();
+
         CompletableFuture<List<Object>> receiverRes = 
receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.arg());
 
         if (receiverRes == null) {
             return CompletableFuture.completedFuture(null);
         }
 
-        return 
receiverRes.thenApply(StreamerReceiverSerializer::serializeReceiverJobResults);
+        return receiverRes.thenApply(r -> 
StreamerReceiverSerializer.serializeReceiverJobResults(r, 
receiver.resultMarshaller()));
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index 76f8373f40c..a5656216a53 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -55,6 +55,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.marshalling.ByteArrayMarshaller;
+import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.sql.IgniteSql;
@@ -64,6 +66,7 @@ import org.apache.ignite.table.DataStreamerOperationType;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.DataStreamerReceiver;
 import org.apache.ignite.table.DataStreamerReceiverContext;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.DataStreamerTarget;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.ReceiverDescriptor;
@@ -418,6 +421,65 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
 
         var resultSubscriber = returnResults ? new TestSubscriber<String>() : 
null;
 
+        try (var publisher = new SubmissionPublisher<Tuple>()) {
+            streamerFut = target.streamData(
+                    publisher,
+                    
DataStreamerReceiverDescriptor.builder(TestReceiver.class).build(),
+                    keyFunc,
+                    t -> t.stringValue(1),
+                    "arg1",
+                    resultSubscriber,
+                    DataStreamerOptions.builder().retryLimit(0).build()
+            );
+
+            // Same ID goes to the same partition.
+            publisher.submit(tuple(1, "val1"));
+            publisher.submit(tuple(1, "val2"));
+            publisher.submit(tuple(1, "val3"));
+        }
+
+        assertThat(streamerFut, willCompleteSuccessfully());
+
+        if (returnResults) {
+            assertEquals(1, resultSubscriber.items.size());
+            assertEquals("Received: 3 items, arg1 arg", 
resultSubscriber.items.iterator().next());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testWithReceiverRecordBinaryViewDeprecated(boolean 
returnResults) {
+        testWithReceiverDeprecated(defaultTable().recordView(), 
Function.identity(), returnResults);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testWithReceiverKvBinaryViewDeprecated(boolean returnResults) {
+        testWithReceiverDeprecated(defaultTable().keyValueView(), t -> 
Map.entry(t, t), returnResults);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testWithReceiverRecordPojoViewDeprecated(boolean 
returnResults) {
+        RecordView<PersonPojo> view = 
defaultTable().recordView(PersonPojo.class);
+
+        testWithReceiverDeprecated(view, t -> new PersonPojo(t.intValue(0)), 
returnResults);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testWithReceiverKvPojoViewDeprecated(boolean returnResults) {
+        KeyValueView<Integer, PersonValPojo> view = 
defaultTable().keyValueView(Mapper.of(Integer.class), 
Mapper.of(PersonValPojo.class));
+
+        testWithReceiverDeprecated(view, t -> Map.entry(t.intValue(0), new 
PersonValPojo()), returnResults);
+    }
+
+    @SuppressWarnings("deprecation")
+    private static <T> void testWithReceiverDeprecated(DataStreamerTarget<T> 
target, Function<Tuple, T> keyFunc, boolean returnResults) {
+        CompletableFuture<Void> streamerFut;
+
+        var resultSubscriber = returnResults ? new TestSubscriber<String>() : 
null;
+
         try (var publisher = new SubmissionPublisher<Tuple>()) {
             streamerFut = target.streamData(
                     publisher,
@@ -456,10 +518,12 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         try (var publisher = new SubmissionPublisher<Tuple>()) {
             streamerFut = view.streamData(
                     publisher,
+                    
DataStreamerReceiverDescriptor.builder(NodeNameReceiver.class).build(),
                     t -> t,
                     t -> t.intValue(0),
-                    ReceiverDescriptor.builder(NodeNameReceiver.class).build(),
-                    null, null, null
+                    null,
+                    null,
+                    null
             );
 
             for (int i = 0; i < count; i++) {
@@ -482,18 +546,17 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
     public void testReceiverException(boolean async) {
         CompletableFuture<Void> streamerFut;
 
-        Object key = 0;
         Tuple item = tupleKey(1);
 
         try (var publisher = new SubmissionPublisher<Tuple>()) {
             streamerFut = defaultTable().recordView().streamData(
                     publisher,
+                    
DataStreamerReceiverDescriptor.builder(TestReceiver.class).build(),
                     t -> t,
-                    t -> key,
-                    ReceiverDescriptor.builder(TestReceiver.class).build(),
+                    t -> "",
+                    async ? "throw-async" : "throw",
                     null,
-                    
DataStreamerOptions.builder().retryLimit(0).pageSize(1).build(),
-                    async ? "throw-async" : "throw");
+                    
DataStreamerOptions.builder().retryLimit(0).pageSize(1).build());
 
             publisher.submit(item);
         }
@@ -572,12 +635,12 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
 
             streamerFut = defaultTable().recordView().streamData(
                     publisher,
+                    
DataStreamerReceiverDescriptor.builder(TupleReceiver.class).build(),
                     Function.identity(),
                     Function.identity(),
-                    ReceiverDescriptor.builder(TupleReceiver.class).build(),
+                    receiverArg,
                     resultSubscriber,
-                    null,
-                    receiverArg
+                    null
             );
 
             // Tuple payload.
@@ -649,6 +712,79 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
         assertEquals(2, resTupleInner2.intValue("int"));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"arg1", ""})
+    public void testMarshallingReceiver(String arg) {
+        // Check that null arg works.
+        arg = arg.isEmpty() ? null : arg;
+
+        DataStreamerReceiverDescriptor<String, String, String> desc = 
DataStreamerReceiverDescriptor
+                .builder(MarshallingReceiver.class)
+                .payloadMarshaller(new StringSuffixMarshaller())
+                .argumentMarshaller(new StringSuffixMarshaller())
+                .resultMarshaller(new StringSuffixMarshaller())
+                .build();
+
+        CompletableFuture<Void> streamerFut;
+        var resultSubscriber = new TestSubscriber<String>();
+
+        try (var publisher = new SubmissionPublisher<String>()) {
+            streamerFut = defaultTable().recordView().streamData(
+                    publisher,
+                    desc,
+                    x -> Tuple.create().set("id", 1),
+                    Function.identity(),
+                    arg,
+                    resultSubscriber,
+                    null
+            );
+
+            publisher.submit("val1");
+            publisher.submit("val2");
+        }
+
+        assertThat(streamerFut, willCompleteSuccessfully());
+        assertEquals(2, resultSubscriber.items.size());
+
+        String expected = "received[arg=" + arg + 
":beforeMarshal:afterUnmarshal,val=val1:beforeMarshal:afterUnmarshal]"
+                + ":beforeMarshal:afterUnmarshal";
+
+        assertEquals(expected, resultSubscriber.items.get(0));
+    }
+
+    @Test
+    public void testReceiverMarshallerMismatch() {
+        DataStreamerReceiverDescriptor<String, String, String> desc = 
DataStreamerReceiverDescriptor
+                .builder(MarshallingReceiver.class)
+                .build();
+
+        CompletableFuture<Void> streamerFut;
+
+        try (var publisher = new SubmissionPublisher<String>()) {
+            streamerFut = defaultTable().recordView().streamData(
+                    publisher,
+                    desc,
+                    x -> Tuple.create().set("id", 1),
+                    Function.identity(),
+                    "arg",
+                    null,
+                    null
+            );
+
+            publisher.submit("val1");
+        }
+
+        var ex = assertThrows(CompletionException.class, () -> 
streamerFut.orTimeout(10, TimeUnit.SECONDS).join());
+        DataStreamerException dsEx = (DataStreamerException) ex.getCause();
+
+        assertThat(dsEx.getMessage(), containsString(
+                "Marshaller is defined in the DataStreamerReceiver 
implementation, "
+                        + "expected argument type: `byte[]`, actual: `class 
java.lang.String`. "
+                        + "Ensure that DataStreamerReceiverDescriptor 
marshallers match DataStreamerReceiver marshallers."));
+
+        assertEquals("IGN-COMPUTE-13", dsEx.codeAsString());
+    }
+
     private Tuple receiverTupleRoundTrip(Tuple tuple, boolean asArg) {
         CompletableFuture<Void> streamerFut;
         var resultSubscriber = new TestSubscriber<Tuple>();
@@ -658,12 +794,12 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
 
             streamerFut = defaultTable().recordView().streamData(
                     publisher,
+                    
DataStreamerReceiverDescriptor.builder(TupleReceiver.class).build(),
                     Function.identity(),
                     Function.identity(),
-                    ReceiverDescriptor.builder(TupleReceiver.class).build(),
+                    receiverArg,
                     resultSubscriber,
-                    null,
-                    receiverArg
+                    null
             );
 
             publisher.submit(asArg ? Tuple.create() : tuple);
@@ -811,4 +947,42 @@ public abstract class ItAbstractDataStreamerTest extends 
ClusterPerClassIntegrat
             return CompletableFuture.completedFuture(page);
         }
     }
+
+    private static class StringSuffixMarshaller implements 
ByteArrayMarshaller<String> {
+        @Override
+        public byte @Nullable [] marshal(@Nullable String object) {
+            return ByteArrayMarshaller.super.marshal(object + 
":beforeMarshal");
+        }
+
+        @Override
+        public @Nullable String unmarshal(byte @Nullable [] raw) {
+            return ByteArrayMarshaller.super.unmarshal(raw) + 
":afterUnmarshal";
+        }
+    }
+
+    private static class MarshallingReceiver implements 
DataStreamerReceiver<String, String, String> {
+        @Override
+        public @Nullable CompletableFuture<List<String>> receive(List<String> 
page, DataStreamerReceiverContext ctx, @Nullable String arg) {
+            var results = page.stream()
+                    .map(s -> "received[arg=" + arg + ",val=" + s + "]")
+                    .collect(Collectors.toList());
+
+            return CompletableFuture.completedFuture(results);
+        }
+
+        @Override
+        public @Nullable Marshaller<String, byte[]> payloadMarshaller() {
+            return new StringSuffixMarshaller();
+        }
+
+        @Override
+        public @Nullable Marshaller<String, byte[]> argumentMarshaller() {
+            return new StringSuffixMarshaller();
+        }
+
+        @Override
+        public @Nullable Marshaller<String, byte[]> resultMarshaller() {
+            return new StringSuffixMarshaller();
+        }
+    }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
index 41af0bd99eb..9231930b5df 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofKeyValueView.java
@@ -30,8 +30,8 @@ import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.tx.Transaction;
@@ -272,16 +272,15 @@ class RestartProofKeyValueView<K, V> extends 
RestartProofApiObject<KeyValueView<
     }
 
     @Override
-    public <E, V1, R, A> CompletableFuture<Void> streamData(
+    public <E, V1, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V1, A, R> receiver,
             Function<E, Entry<K, V>> keyFunc,
             Function<E, V1> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg
-    ) {
-        return attachedAsync(view -> view.streamData(publisher, keyFunc, 
payloadFunc, receiver, resultSubscriber, options, receiverArg));
+            @Nullable DataStreamerOptions options) {
+        return attachedAsync(view -> view.streamData(publisher, receiver, 
keyFunc, payloadFunc, receiverArg, resultSubscriber, options));
     }
 
     // TODO: IGNITE-23011 - support cursor transparency?
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java
index 5b98b47bf1c..0fc37d3919b 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofRecordView.java
@@ -30,7 +30,7 @@ import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
@@ -235,16 +235,15 @@ class RestartProofRecordView<R> extends 
RestartProofApiObject<RecordView<R>> imp
     }
 
     @Override
-    public <E, V, R1, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R1> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R1> receiver,
             Function<E, R> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Subscriber<R1> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg
-    ) {
-        return attachedAsync(view -> view.streamData(publisher, keyFunc, 
payloadFunc, receiver, resultSubscriber, options, receiverArg));
+            @Nullable DataStreamerOptions options) {
+        return attachedAsync(view -> view.streamData(publisher, receiver, 
keyFunc, payloadFunc, receiverArg, resultSubscriber, options));
     }
 
     // TODO: IGNITE-23011 - support cursor transparency?
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java
index 709315b0f83..e4a40d278c4 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiKeyValueViewAdapter.java
@@ -31,9 +31,9 @@ import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.DataStreamerTarget;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.criteria.CriteriaQuerySource;
@@ -301,9 +301,10 @@ public class AsyncApiKeyValueViewAdapter<K, V> implements 
KeyValueView<K, V> {
     }
 
     @Override
-    public <E, S, R, A> CompletableFuture<Void> streamData(Publisher<E> 
publisher, Function<E, Entry<K, V>> keyFunc,
-            Function<E, S> payloadFunc, ReceiverDescriptor<A> receiver, 
@Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options, @Nullable A receiverArg) {
+    public <E, S, A, R> CompletableFuture<Void> streamData(Publisher<E> 
publisher, DataStreamerReceiverDescriptor<S, A, R> receiver,
+            Function<E, Entry<K, V>> keyFunc, Function<E, S> payloadFunc, 
@Nullable A receiverArg,
+            @Nullable Flow.Subscriber<R> resultSubscriber,
+            @Nullable DataStreamerOptions options) {
         throw new UnsupportedOperationException();
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java
index 76c873083d8..fd8d971b9d3 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/AsyncApiRecordViewAdapter.java
@@ -29,8 +29,8 @@ import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.DataStreamerTarget;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
@@ -46,7 +46,7 @@ import org.jetbrains.annotations.Nullable;
 public class AsyncApiRecordViewAdapter<V> implements RecordView<V> {
     private final RecordView<V> delegate;
 
-    public AsyncApiRecordViewAdapter(RecordView<V> delegate) {
+    AsyncApiRecordViewAdapter(RecordView<V> delegate) {
         this.delegate = delegate;
     }
 
@@ -268,9 +268,9 @@ public class AsyncApiRecordViewAdapter<V> implements 
RecordView<V> {
     }
 
     @Override
-    public <E, V1, R, A> CompletableFuture<Void> streamData(Publisher<E> 
publisher, Function<E, V> keyFunc, Function<E, V1> payloadFunc,
-            ReceiverDescriptor<A> receiver, @Nullable Subscriber<R> 
resultSubscriber, @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg) {
+    public <E, V1, A, R> CompletableFuture<Void> streamData(Publisher<E> 
publisher, DataStreamerReceiverDescriptor<V1, A, R> receiver,
+            Function<E, V> keyFunc, Function<E, V1> payloadFunc, @Nullable A 
receiverArg, @Nullable Subscriber<R> resultSubscriber,
+            @Nullable DataStreamerOptions options) {
         throw new UnsupportedOperationException();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 437053fdf77..958c48d3a11 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -54,8 +54,8 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -576,14 +576,14 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     }
 
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, Entry<Tuple, Tuple>> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 9db6fd2f984..8648a08819c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -64,8 +64,8 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.KeyValueView;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -816,14 +816,14 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     }
 
     @Override
-    public <E, V1, R, A> CompletableFuture<Void> streamData(
+    public <E, V1, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V1, A, R> receiver,
             Function<E, Entry<K, V>> keyFunc,
             Function<E, V1> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java
index f4cf5e01a47..165ffa8813e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/PublicApiThreadingViewBase.java
@@ -28,8 +28,8 @@ import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.DataStreamerTarget;
-import org.apache.ignite.table.ReceiverDescriptor;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.criteria.CriteriaQuerySource;
@@ -58,22 +58,22 @@ abstract class PublicApiThreadingViewBase<T> implements 
DataStreamerTarget<T>, C
     }
 
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, T> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         return executeAsyncOp(() -> streamerTarget.streamData(
                 publisher,
+                receiver,
                 keyFunc,
                 payloadFunc,
-                receiver,
+                receiverArg,
                 resultSubscriber,
-                options,
-                receiverArg));
+                options));
     }
 
     @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index befa290a258..fdbbe6162a3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -48,7 +48,7 @@ import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
@@ -575,14 +575,14 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     }
 
     @Override
-    public <E, V, R, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R> receiver,
             Function<E, Tuple> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 6c8a827bd0d..5045b9fa271 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -56,7 +56,7 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerItem;
 import org.apache.ignite.table.DataStreamerOptions;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
@@ -606,14 +606,14 @@ public class RecordViewImpl<R> extends 
AbstractTableView<R> implements RecordVie
     }
 
     @Override
-    public <E, V, R1, A> CompletableFuture<Void> streamData(
+    public <E, V, A, R1> CompletableFuture<Void> streamData(
             Publisher<E> publisher,
+            DataStreamerReceiverDescriptor<V, A, R1> receiver,
             Function<E, R> keyFunc,
             Function<E, V> payloadFunc,
-            ReceiverDescriptor<A> receiver,
+            @Nullable A receiverArg,
             @Nullable Flow.Subscriber<R1> resultSubscriber,
-            @Nullable DataStreamerOptions options,
-            @Nullable A receiverArg) {
+            @Nullable DataStreamerOptions options) {
         Objects.requireNonNull(publisher);
         Objects.requireNonNull(keyFunc);
         Objects.requireNonNull(payloadFunc);
@@ -625,7 +625,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
                                 .thenCompose(node -> 
tbl.streamerReceiverRunner().runReceiverAsync(
                                         receiver, receiverArg, rows, node, 
receiver.units())));
 
-        CompletableFuture<Void> future = DataStreamer.<R, E, V, R1>streamData(
+        CompletableFuture<Void> future = DataStreamer.streamData(
                 publisher,
                 keyFunc,
                 payloadFunc,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java
index c15af2e0604..c07e01a43c5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/StreamerReceiverRunner.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.ReceiverExecutionOptions;
 import org.jetbrains.annotations.Nullable;
 
@@ -44,7 +44,7 @@ public interface StreamerReceiverRunner {
      * @param <R> Result type.
      */
     <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(
-            ReceiverDescriptor<A> receiver,
+            DataStreamerReceiverDescriptor<I, A, R> receiver,
             @Nullable A receiverArg,
             Collection<I> items,
             ClusterNode node,

Reply via email to