This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fcf2fade6a IGNITE-22643 Support Tuple with schema in Compute API 
(#4142)
fcf2fade6a is described below

commit fcf2fade6a4b7630b96364e96feea07e0e4dd750
Author: Aleksandr Pakhomov <[email protected]>
AuthorDate: Thu Aug 22 18:29:39 2024 +0300

    IGNITE-22643 Support Tuple with schema in Compute API (#4142)
    
    To support Tuples in compute API I've refactored some parts:
    
    - ClientBinaryTupleUtils does not accept Marshaller when append the object.
    - ClientComputeJobPacker and ClientComputeJobUnpacker are introduced.
    They are supposed to encapsulate all logic about object marshalling in 
Compute.
---
 gradle/libs.versions.toml                          |   1 +
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../org/apache/ignite/marshalling/Marshaller.java  |  60 +++-
 ...gException.java => UnmarshallingException.java} |  22 +-
 .../UnsupportedObjectTypeMarshallingException.java |  13 +-
 .../marshalling/ByteArrayMarshallerTest.java       |   5 +-
 modules/binary-tuple/build.gradle                  |   5 +-
 .../inlineschema/TupleWithSchemaMarshalling.java   | 373 +++++++++++++++++++++
 .../TupleWithSchemaMarshallingTest.java            | 132 ++++++++
 .../client/proto/ClientBinaryTupleUtils.java       |   6 +-
 .../client/proto/ClientComputeJobPacker.java       |  91 +++++
 .../client/proto/ClientComputeJobUnpacker.java     | 107 ++++++
 .../internal/client/proto/ClientMessagePacker.java |  28 +-
 .../internal/client/proto/ComputeJobType.java      |  73 ++++
 .../client/proto/StreamerReceiverSerializer.java   |   4 +-
 .../proto/event/JdbcBatchPreparedStmntRequest.java |   2 +-
 .../jdbc/proto/event/JdbcQueryExecuteRequest.java  |   2 +-
 .../proto/ClientComputeJobPackerUnpackerTest.java  | 247 ++++++++++++++
 .../proto/ClientMessagePackerUnpackerTest.java     |   8 +-
 .../internal/client/proto/ComputeJobTypeTest.java  |  60 ++++
 .../ClientComputeExecuteColocatedRequest.java      |   2 -
 .../ClientComputeExecuteMapReduceRequest.java      |   2 +-
 .../compute/ClientComputeExecuteRequest.java       |  22 +-
 .../internal/client/compute/ClientCompute.java     |   5 +-
 .../client/compute/ClientJobExecution.java         |   7 +-
 .../ignite/internal/client/sql/ClientSql.java      |   6 +-
 .../apache/ignite/client/fakes/FakeCompute.java    |  59 ++--
 .../internal/compute/DelegatingJobExecution.java   |  13 +-
 .../ignite/internal/compute/ExecutionManager.java  |   7 +
 .../internal/compute/FailSafeJobExecution.java     |  13 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |  78 ++---
 .../internal/compute/IgniteComputeInternal.java    |  11 +-
 .../internal/compute/JobExecutionWrapper.java      |  12 +-
 .../internal/compute/MarshallerProvider.java}      |  20 +-
 .../compute/ResultMarshallingJobExecution.java     |  18 +-
 ...n.java => ResultUnmarshallingJobExecution.java} |  22 +-
 .../compute/executor/ComputeExecutorImpl.java      |  24 +-
 .../compute/executor/JobExecutionInternal.java     |  14 +-
 .../ignite/client/detail/compute/compute_impl.cpp  |  71 +++-
 modules/platforms/cpp/ignite/common/error_codes.h  |   3 +-
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |   2 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../Apache.Ignite/Internal/Compute/Compute.cs      |   6 +-
 .../Internal/Compute/ComputePacker.cs              | 122 +++++++
 .../Internal/Proto/MsgPack/MsgPackReader.cs        |  34 --
 .../Internal/Proto/MsgPack/MsgPackWriter.cs        |  50 ++-
 .../client/ItThinClientComputeMarshallingTest.java |  38 ++-
 ...tThinClientComputeTypeCheckMarshallingTest.java |   7 +-
 .../ItThinClientTupleComputeMarshallingTest.java   | 191 +++++++++++
 .../app/compute/ItEmbeddedMarshallingTest.java     |  67 +++-
 51 files changed, 1881 insertions(+), 291 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index a66d0b66ae..03ceb99766 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -168,6 +168,7 @@ jansi-core = { module = "org.fusesource.jansi:jansi", 
version.ref = "jansi" }
 jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", 
version.ref = "jackson" }
 jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", 
version.ref = "jackson" }
 jackson-annotations = { module = 
"com.fasterxml.jackson.core:jackson-annotations", version.ref = "jackson" }
+jackson-datatype-jsr310 = { module = 
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = 
"jackson" }
 
 typesafe-config = { module = "com.typesafe:config", version.ref = "typesafe" }
 
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index c0eb3e42f8..4a173d618b 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -671,5 +671,8 @@ public class ErrorGroups {
 
         /** Unsupported object type error. */
         public static final int UNSUPPORTED_OBJECT_TYPE_ERR = 
MARSHALLING_ERR_GROUP.registerErrorCode((short) 2);
+
+        /** There format of bytes is wrong. */
+        public static final int UNMARSHALLING_ERR = 
MARSHALLING_ERR_GROUP.registerErrorCode((short) 3);
     }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java 
b/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
index 6f802f8740..3a9b65cef2 100644
--- a/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
+++ b/modules/api/src/main/java/org/apache/ignite/marshalling/Marshaller.java
@@ -21,37 +21,75 @@ package org.apache.ignite.marshalling;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Object marshaller interface that is used in every Ignite API that requires
- * serialization/deserialization of user objects. If you want to define the way
- * your objects are serialized/deserialized, you can implement this interface
- * and pass it to the API that requires it.
+ * Object marshaller interface that is used in every Ignite API that requires 
serialization/deserialization of user objects. If you want to
+ * define the way your objects are serialized/deserialized, you can implement 
this interface and pass it to the API that requires it.
  *
  * <p>NOTE: The marshaller itself are not sent over the wire. This means that 
if you
- * define a custom marshaller on the client, you must also define the 
marshaller
- * on the server as well.
+ * define a custom marshaller on the client, you must also define the 
marshaller on the server as well.
  *
  * @param <T> The object (T)ype.
- * @param <R> The (R)aw type, for example, {@code byte[]} or {@link 
org.apache.ignite.table.Tuple}.
+ * @param <R> The (R)aw type, for example, {@code byte[]}.
  */
 public interface Marshaller<T, R> {
     /**
      * Marshal the object into raw type.
      *
      * @param object object to marshal.
-     *
      * @return raw presentation of object.
      * @throws UnsupportedObjectTypeMarshallingException if the given type can 
not be marshalled with current instance.
      */
-    @Nullable R marshal(@Nullable T object) throws 
UnsupportedObjectTypeMarshallingException;
+    @Nullable
+    R marshal(@Nullable T object) throws 
UnsupportedObjectTypeMarshallingException;
 
     /**
      * Unmarshal the raw type into object.
      *
      * @param raw raw presentation of object.
-     *
      * @return object.
      * @throws UnsupportedObjectTypeMarshallingException if the given type can 
not be unmarshalled with current instance.
      */
-    @Nullable T unmarshal(@Nullable R raw) throws 
UnsupportedObjectTypeMarshallingException;
+    @Nullable
+    T unmarshal(@Nullable R raw) throws 
UnsupportedObjectTypeMarshallingException;
+
+    /**
+     * Try to marshal given object if marshaller if not null, else the object 
is casted directly to the target type.
+     *
+     * @param self the marshaller instance.
+     * @param object to marshal.
+     * @param <T> The object (T)ype.
+     * @param <R> The (R)aw type, for example.
+     */
+    static <T, R> @Nullable R tryMarshalOrCast(@Nullable Marshaller<T, R> 
self, @Nullable Object object) {
+        if (self != null) {
+            try {
+                T castedObj = (T) object;
+                return self.marshal(castedObj);
+            } catch (ClassCastException ignored) {
+                // ignore.
+            }
+        }
+
+        return (R) object;
+    }
+
+    /**
+     * Try to unmarshal given object if marshaller if not null, else the 
object is casted directly to the target type.
+     *
+     * @param self the marshaller instance.
+     * @param <T> The object (T)ype.
+     * @param <R> The (R)aw type, for example, {@code byte[]}.
+     */
+    static <T, R> @Nullable T tryUnmarshalOrCast(@Nullable Marshaller<T, R> 
self, @Nullable Object raw) {
+        if (self != null) {
+            try {
+                R rawType = (R) raw;
+                return self.unmarshal(rawType);
+            } catch (ClassCastException ignored) {
+                // ignore.
+            }
+        }
+
+        return (T) raw;
+    }
 }
 
diff --git 
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
 
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
similarity index 66%
copy from 
modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
copy to 
modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
index bb8b4ba636..5edd341007 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnmarshallingException.java
@@ -21,22 +21,14 @@ import java.util.UUID;
 import org.apache.ignite.lang.ErrorGroups.Marshalling;
 import org.apache.ignite.lang.IgniteException;
 
-/**
- * Exception thrown when an object type is not supported by the marshaller.
- */
-public class UnsupportedObjectTypeMarshallingException extends IgniteException 
{
+/** Exception thrown when unmarshalling failed. */
+public class UnmarshallingException extends IgniteException {
     private static final long serialVersionUID = -8131613381875542450L;
 
-    /**
-     * Creates an exception with the given unsupported type.
-     *
-     * @param unsupportedType Unsupported type.
-     */
-    public UnsupportedObjectTypeMarshallingException(Class<?> unsupportedType) 
{
-        super(
-                Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR,
-                "Unsupported object type: " + unsupportedType.getName() + ". 
Please, define the marshaller that can handle this type."
-        );
+
+    /** Constructor. */
+    public UnmarshallingException(String msg) {
+        super(Marshalling.UNMARSHALLING_ERR, msg);
     }
 
     /**
@@ -47,7 +39,7 @@ public class UnsupportedObjectTypeMarshallingException 
extends IgniteException {
      * @param message Detailed message.
      * @param cause Optional nested exception (can be {@code null}).
      */
-    public UnsupportedObjectTypeMarshallingException(UUID traceId, int code, 
String message, Throwable cause) {
+    public UnmarshallingException(UUID traceId, int code, String message, 
Throwable cause) {
         super(traceId, code, message, cause);
     }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
 
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
index bb8b4ba636..90ab7b4fa2 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/marshalling/UnsupportedObjectTypeMarshallingException.java
@@ -27,16 +27,21 @@ import org.apache.ignite.lang.IgniteException;
 public class UnsupportedObjectTypeMarshallingException extends IgniteException 
{
     private static final long serialVersionUID = -8131613381875542450L;
 
+
     /**
      * Creates an exception with the given unsupported type.
      *
      * @param unsupportedType Unsupported type.
      */
     public UnsupportedObjectTypeMarshallingException(Class<?> unsupportedType) 
{
-        super(
-                Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR,
-                "Unsupported object type: " + unsupportedType.getName() + ". 
Please, define the marshaller that can handle this type."
-        );
+        this("Unsupported object type: " + unsupportedType.getName() + ". 
Please, define a marshaller that can handle this type.");
+    }
+
+    /**
+     * Creates an exception with the given unsupported type.
+     */
+    public UnsupportedObjectTypeMarshallingException(String msg) {
+        super(Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR, msg);
     }
 
     /**
diff --git 
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
 
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
index fab85e3a17..dbdd9f7ec5 100644
--- 
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
+++ 
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
@@ -64,8 +64,7 @@ class ByteArrayMarshallerTest {
     private static Stream<Arguments> cornerValues() {
         return Stream.of(
                "", ".*456+576+$%^&*()_+{}|:<>?`~", "1", "0",
-                -1, 0,
-                new Object()
+                -1, 0
         ).map(Arguments::of);
     }
 
@@ -86,7 +85,9 @@ class ByteArrayMarshallerTest {
     @ParameterizedTest
     @MethodSource("cornerValues")
     void cornerValuesMarshalling(Object obj) {
+        ByteArrayMarshaller<Object> marshaller = ByteArrayMarshaller.create();
 
+        assertEquals(obj, marshaller.unmarshal(marshaller.marshal(obj)));
     }
 
 
diff --git a/modules/binary-tuple/build.gradle 
b/modules/binary-tuple/build.gradle
index 8200416793..21a695a331 100644
--- a/modules/binary-tuple/build.gradle
+++ b/modules/binary-tuple/build.gradle
@@ -26,5 +26,8 @@ dependencies {
     implementation project(':ignite-api')
     implementation project(':ignite-core')
     implementation libs.jetbrains.annotations
-}
 
+    testImplementation libs.jackson.core
+    testImplementation libs.jackson.databind
+    testImplementation libs.jackson.datatype.jsr310
+}
diff --git 
a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
new file mode 100644
index 0000000000..ace65ac1e1
--- /dev/null
+++ 
b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshalling.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple.inlineschema;
+
+
+import static 
org.apache.ignite.lang.ErrorGroups.Marshalling.UNSUPPORTED_OBJECT_TYPE_ERR;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.UUID;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/** Tuple with schema marshalling. */
+public final class TupleWithSchemaMarshalling {
+    private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+
+    /**
+     * Marshal tuple in the following format (LITTLE_ENDIAN).
+     *
+     * <pre>
+     * marshalledTuple := | size | valuePosition | binaryTupleWithSchema |
+     *
+     * size            := int32
+     * valuePosition   := int32
+     * binaryTupleWithSchema := | schemaBinaryTuple | valueBinaryTuple |
+     * schemaBinaryTuple := | column1 | ... | columnN |
+     * column              := | columnName | columnType |
+     * columnName          := string
+     * columnType          := int32
+     * valueBinaryTuple := | value1 | ... | valueN |.
+     * </pre>
+     */
+    public static byte @Nullable [] marshal(@Nullable Tuple tuple) {
+        if (tuple == null) {
+            return null;
+        }
+
+        // Allocate all the memory we need upfront.
+        int size = tuple.columnCount();
+        Object[] values = new Object[size];
+        String[] columns = new String[size];
+        ColumnType[] types = new ColumnType[size];
+
+        // Fill in the values, column names, and types.
+        for (int i = 0; i < size; i++) {
+            var value = tuple.value(i);
+            values[i] = value;
+            columns[i] = tuple.columnName(i);
+            types[i] = inferType(value);
+        }
+
+        ByteBuffer schemaBuff = schemaBuilder(columns, types).build();
+        ByteBuffer valueBuff = valueBuilder(columns, types, values).build();
+
+        int schemaBuffLen = schemaBuff.remaining();
+        int valueBuffLen = valueBuff.remaining();
+
+        // Size: int32 (tuple size), int32 (value offset), schema, value.
+        byte[] result = new byte[4 + 4 + schemaBuffLen + valueBuffLen];
+        ByteBuffer buff = ByteBuffer.wrap(result).order(BYTE_ORDER);
+
+        // Put the size of the schema in the first 4 bytes.
+        buff.putInt(size);
+
+        // Put the value offset in the second 4 bytes.
+        int offset = schemaBuffLen + 8;
+
+        buff.putInt(offset);
+
+        buff.put(schemaBuff);
+        buff.put(valueBuff);
+
+        return result;
+    }
+
+    /**
+     * Unmarshal tuple (LITTLE_ENDIAN).
+     *
+     * @param raw byte[] bytes that are marshaled by {@link #marshal(Tuple)}.
+     */
+    public static @Nullable Tuple unmarshal(byte @Nullable [] raw) {
+        if (raw == null) {
+            return null;
+        }
+        if (raw.length < 8) {
+            throw new UnmarshallingException("byte[] length can not be less 
than 8");
+        }
+
+        // Read first int32.
+        ByteBuffer buff = ByteBuffer.wrap(raw).order(BYTE_ORDER);
+        int size = buff.getInt(0);
+        if (size < 0) {
+            throw new UnmarshallingException("Size of the tuple can not be 
less than zero");
+        }
+
+        // Read second int32.
+        int valueOffset = buff.getInt(4);
+        if (valueOffset < 0) {
+            throw new UnmarshallingException("valueOffset can not be less than 
zero");
+        }
+        if (valueOffset > raw.length) {
+            throw new UnmarshallingException(
+                    "valueOffset can not be greater than byte[] length, 
valueOffset: "
+                            + valueOffset + ", length: " + raw.length
+            );
+        }
+
+        ByteBuffer schemaBuff = buff
+                .position(8).limit(valueOffset)
+                .slice().order(BYTE_ORDER);
+        ByteBuffer valueBuff = buff
+                .position(valueOffset).limit(raw.length)
+                .slice().order(BYTE_ORDER);
+
+        BinaryTupleReader schemaReader = new BinaryTupleReader(size * 2, 
schemaBuff);
+        BinaryTupleReader valueReader = new BinaryTupleReader(size, valueBuff);
+
+        Tuple tup = Tuple.create(size);
+
+        for (int i = 0; i < size; i++) {
+            String colName = schemaReader.stringValue(i * 2);
+            int colTypeId = schemaReader.intValue(i * 2 + 1);
+
+            setColumnValue(valueReader, tup, colName, 
ColumnType.getById(colTypeId), i);
+        }
+
+        return tup;
+    }
+
+    private static BinaryTupleBuilder schemaBuilder(String[] columns, 
ColumnType[] types) {
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(columns.length * 
2);
+
+        for (int i = 0; i < columns.length; i++) {
+            builder.appendString(columns[i]);
+            builder.appendInt(types[i].id());
+        }
+
+        return builder;
+    }
+
+    private static BinaryTupleBuilder valueBuilder(String[] columnNames, 
ColumnType[] types, Object[] values) {
+        BinaryTupleBuilder builder = new BinaryTupleBuilder(values.length);
+
+        for (int i = 0; i < values.length; i++) {
+            ColumnType type = types[i];
+            Object v = values[i];
+
+            append(type, columnNames[i], builder, v);
+        }
+
+        return builder;
+    }
+
+    private static void append(ColumnType type, String name, 
BinaryTupleBuilder builder, Object value) {
+        try {
+            switch (type) {
+                case NULL:
+                    builder.appendNull();
+                    return;
+                case BOOLEAN:
+                    builder.appendBoolean((Boolean) value);
+                    return;
+                case INT8:
+                    builder.appendByte((Byte) value);
+                    return;
+                case INT16:
+                    builder.appendShort((Short) value);
+                    return;
+                case INT32:
+                    builder.appendInt((Integer) value);
+                    return;
+                case INT64:
+                    builder.appendLong((Long) value);
+                    return;
+                case FLOAT:
+                    builder.appendFloat((Float) value);
+                    return;
+                case DOUBLE:
+                    builder.appendDouble((Double) value);
+                    return;
+                case STRING:
+                    builder.appendString((String) value);
+                    return;
+                case DECIMAL:
+                    BigDecimal d = (BigDecimal) value;
+                    builder.appendDecimal(d, d.scale());
+                    return;
+                case DATE:
+                    builder.appendDate((LocalDate) value);
+                    return;
+                case TIME:
+                    builder.appendTime((LocalTime) value);
+                    return;
+                case DATETIME:
+                    builder.appendDateTime((LocalDateTime) value);
+                    return;
+                case TIMESTAMP:
+                    builder.appendTimestamp((Instant) value);
+                    return;
+                case UUID:
+                    builder.appendUuid((UUID) value);
+                    return;
+                case BYTE_ARRAY:
+                    builder.appendBytes((byte[]) value);
+                    return;
+                case PERIOD:
+                    builder.appendPeriod((Period) value);
+                    return;
+                case DURATION:
+                    builder.appendDuration((Duration) value);
+                    return;
+                default:
+                    throw new IllegalArgumentException("Unsupported type: " + 
type);
+            }
+        } catch (ClassCastException e) {
+            throw new IgniteException(UNSUPPORTED_OBJECT_TYPE_ERR, "Column's 
type mismatch ["
+                    + "column=" + name
+                    + ", expectedType=" + type.javaClass(),
+                    e
+            );
+        }
+    }
+
+    private static ColumnType inferType(@Nullable Object value) {
+        if (value == null) {
+            return ColumnType.NULL;
+        }
+        if (value instanceof Boolean) {
+            return ColumnType.BOOLEAN;
+        }
+        if (value instanceof Byte) {
+            return ColumnType.INT8;
+        }
+        if (value instanceof Short) {
+            return ColumnType.INT16;
+        }
+        if (value instanceof Integer) {
+            return ColumnType.INT32;
+        }
+        if (value instanceof Long) {
+            return ColumnType.INT64;
+        }
+        if (value instanceof Float) {
+            return ColumnType.FLOAT;
+        }
+        if (value instanceof Double) {
+            return ColumnType.DOUBLE;
+        }
+        if (value instanceof String) {
+            return ColumnType.STRING;
+        }
+        if (value instanceof BigDecimal) {
+            return ColumnType.DECIMAL;
+        }
+        if (value instanceof LocalDate) {
+            return ColumnType.DATE;
+        }
+        if (value instanceof LocalTime) {
+            return ColumnType.TIME;
+        }
+        if (value instanceof LocalDateTime) {
+            return ColumnType.DATETIME;
+        }
+        if (value instanceof Instant) {
+            return ColumnType.TIMESTAMP;
+        }
+        if (value instanceof UUID) {
+            return ColumnType.UUID;
+        }
+        if (value instanceof byte[]) {
+            return ColumnType.BYTE_ARRAY;
+        }
+        if (value instanceof Period) {
+            return ColumnType.PERIOD;
+        }
+        if (value instanceof Duration) {
+            return ColumnType.DURATION;
+        }
+        throw new UnsupportedObjectTypeMarshallingException("Tuple field is of 
unsupported type: " + value.getClass());
+    }
+
+
+    private static void setColumnValue(BinaryTupleReader reader, Tuple tuple, 
String colName, ColumnType colType, int i) {
+        switch (colType) {
+            case NULL:
+                tuple.set(colName, null);
+                break;
+            case BOOLEAN:
+                tuple.set(colName, reader.booleanValue(i));
+                break;
+            case INT8:
+                tuple.set(colName, reader.byteValue(i));
+                break;
+            case INT16:
+                tuple.set(colName, reader.shortValue(i));
+                break;
+            case INT32:
+                tuple.set(colName, reader.intValue(i));
+                break;
+            case INT64:
+                tuple.set(colName, reader.longValue(i));
+                break;
+            case FLOAT:
+                tuple.set(colName, reader.floatValue(i));
+                break;
+            case DOUBLE:
+                tuple.set(colName, reader.doubleValue(i));
+                break;
+            case STRING:
+                tuple.set(colName, reader.stringValue(i));
+                break;
+            case DECIMAL:
+                BigDecimal decimal = reader.decimalValue(i, Integer.MIN_VALUE);
+                tuple.set(colName, decimal);
+                break;
+            case DATE:
+                tuple.set(colName, reader.dateValue(i));
+                break;
+            case TIME:
+                tuple.set(colName, reader.timeValue(i));
+                break;
+            case DATETIME:
+                tuple.set(colName, reader.dateTimeValue(i));
+                break;
+            case TIMESTAMP:
+                tuple.set(colName, reader.timestampValue(i));
+                break;
+            case UUID:
+                tuple.set(colName, reader.uuidValue(i));
+                break;
+            case BYTE_ARRAY:
+                tuple.set(colName, reader.bytesValue(i));
+                break;
+            case PERIOD:
+                tuple.set(colName, reader.periodValue(i));
+                break;
+            case DURATION:
+                tuple.set(colName, reader.durationValue(i));
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported type: " + 
colType);
+        }
+    }
+}
diff --git 
a/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
 
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
new file mode 100644
index 0000000000..687ada6bd1
--- /dev/null
+++ 
b/modules/binary-tuple/src/test/java/org/apache/ignite/internal/binarytuple/inlineschema/TupleWithSchemaMarshallingTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binarytuple.inlineschema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class TupleWithSchemaMarshallingTest {
+    private static Stream<Arguments> oneFieldTuple() {
+        return Stream.of(
+                Tuple.create().set("col", 1),
+                Tuple.create().set("col2", true),
+                Tuple.create().set("col3", (byte) 1),
+                Tuple.create().set("col4", (short) 2),
+                Tuple.create().set("col5", 3),
+                Tuple.create().set("col6", 4L),
+                Tuple.create().set("col7", 5.0f),
+                Tuple.create().set("col8", 6.0),
+                Tuple.create().set("col9", new BigDecimal("7.1")),
+                Tuple.create().set("col10", LocalDate.of(2024, 1, 1)),
+                Tuple.create().set("col11", LocalTime.of(12, 0)),
+                Tuple.create().set("col12", LocalDate.of(2024, 1, 
1).atTime(LocalTime.of(12, 0))),
+                Tuple.create().set("col13", 
UUID.fromString("123e4567-e89b-12d3-a456-426614174000")),
+                Tuple.create().set("col14", "string"),
+                Tuple.create().set("col15", new byte[]{1, 2, 3}),
+                Tuple.create().set("col16", Period.ofDays(10)),
+                Tuple.create().set("col17", Duration.ofDays(10))
+        ).map(Arguments::of);
+    }
+
+    private static Stream<Arguments> unsupportedTypes() {
+        return Stream.of(
+                Tuple.create().set("col", new Object()),
+                Tuple.create().set("col1", 1).set("col2", new Object()),
+                Tuple.create().set("col", new ArrayList<>()),
+                Tuple.create().set("col", new HashMap<>()),
+                Tuple.create().set("col", new HashMap<>()),
+                Tuple.create().set("col", Tuple.create())
+        ).map(Arguments::of);
+    }
+
+    private static Stream<Arguments> wrongByteLayout() {
+        return Stream.of(
+                new byte[]{},
+                new byte[]{1},
+                new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}
+        ).map(Arguments::of);
+    }
+
+    @ParameterizedTest
+    @MethodSource("oneFieldTuple")
+    void serDeOneFieldTuple(Tuple tuple) {
+        byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
+        assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
+    }
+
+    @Test
+    void allFieldsTupleTest() {
+        Tuple tuple = Tuple.create()
+                .set("col1", null)
+                .set("col2", true)
+                .set("col3", (byte) 1)
+                .set("col4", (short) 2)
+                .set("col5", 3)
+                .set("col6", 4L)
+                .set("col7", 5.0f)
+                .set("col8", 6.0)
+                .set("col9", new BigDecimal("7.11"))
+                .set("col10", LocalDate.of(2024, 1, 1))
+                .set("col11", LocalTime.of(12, 0))
+                .set("col12", LocalDate.of(2024, 1, 1).atTime(LocalTime.of(12, 
0)))
+                .set("col13", 
UUID.fromString("123e4567-e89b-12d3-a456-426614174000"))
+                .set("col14", "string")
+                .set("col15", new byte[]{1, 2, 3})
+                .set("col16", Period.ofDays(10))
+                .set("col17", Duration.ofDays(10));
+
+        byte[] marshalled = TupleWithSchemaMarshalling.marshal(tuple);
+        assertEquals(tuple, TupleWithSchemaMarshalling.unmarshal(marshalled));
+    }
+
+    @Test
+    void nullTuple() {
+        byte[] marshalled = TupleWithSchemaMarshalling.marshal(null);
+        assertNull(TupleWithSchemaMarshalling.unmarshal(marshalled));
+    }
+
+    @MethodSource("unsupportedTypes")
+    @ParameterizedTest
+    void unsupportedTypesMarshal(Tuple tup) {
+        assertThrows(UnsupportedObjectTypeMarshallingException.class, () -> 
TupleWithSchemaMarshalling.marshal(tup));
+    }
+
+    @MethodSource("wrongByteLayout")
+    @ParameterizedTest
+    void unsupportedTypesUnmarshal(byte[] obj) {
+        assertThrows(UnmarshallingException.class, () -> 
TupleWithSchemaMarshalling.unmarshal(obj));
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
index abd3582c7e..5b30458977 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientBinaryTupleUtils.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.NativeTypeSpec;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.sql.ColumnType;
 import org.jetbrains.annotations.Nullable;
 
@@ -185,14 +184,11 @@ public class ClientBinaryTupleUtils {
      * @param builder Builder.
      * @param obj Object.
      */
-    public static <T> void appendObject(BinaryTupleBuilder builder, @Nullable 
T obj, @Nullable Marshaller<T, byte[]> marshaller) {
+    public static <T> void appendObject(BinaryTupleBuilder builder, @Nullable 
T obj) {
         if (obj == null) {
             builder.appendNull(); // Type.
             builder.appendNull(); // Scale.
             builder.appendNull(); // Value.
-        } else if (marshaller != null) {
-            appendTypeAndScale(builder, ColumnType.BYTE_ARRAY);
-            builder.appendBytes(marshaller.marshal(obj));
         } else if (obj instanceof Boolean) {
             appendTypeAndScale(builder, ColumnType.BOOLEAN);
             builder.appendBoolean((Boolean) obj);
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
new file mode 100644
index 0000000000..0636f80c5e
--- /dev/null
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import 
org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/** Packs job arguments and results. */
+public final class ClientComputeJobPacker {
+    /**
+     * Packs compute job argument. If the marshaller is provided, it will be 
used to marshal the argument. If the marshaller is not provided
+     * and the argument is a native column type or a tuple, it will be packed 
accordingly.
+     *
+     * @param arg Argument.
+     * @param marshaller Marshaller.
+     * @param packer Packer.
+     * @param <T> Argument type.
+     */
+    public static <T> void packJobArgument(@Nullable T arg, @Nullable 
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
+        pack(arg, marshaller, packer);
+    }
+
+    /**
+     * Packs compute job result. If the marshaller is provided, it will be 
used to marshal the result. If the marshaller is not provided and
+     * the result is a native column type or a tuple, it will be packed 
accordingly.
+     *
+     * @param res Result.
+     * @param marshaller Marshaller.
+     * @param packer Packer.
+     * @param <T> Result type.
+     */
+    public static <T> void packJobResult(@Nullable T res, @Nullable 
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
+        pack(res, marshaller, packer);
+    }
+
+    /** Packs object in the format: | typeId | value |. */
+    private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T, 
byte[]> marshaller, ClientMessagePacker packer) {
+        if (marshaller != null) {
+            packer.packInt(ComputeJobType.MARSHALLED_OBJECT_ID);
+            byte[] marshalled = marshaller.marshal(obj);
+
+            if (marshalled == null) {
+                packer.packNil();
+                return;
+            }
+
+            packer.packBinary(marshalled);
+            return;
+        }
+
+        if (obj instanceof Tuple) {
+            byte[] marshalledTuple = 
TupleWithSchemaMarshalling.marshal((Tuple) obj);
+
+            packer.packInt(ComputeJobType.MARSHALLED_TUPLE_ID);
+
+            if (marshalledTuple == null) {
+                packer.packNil();
+                return;
+            }
+
+            packer.packBinary(marshalledTuple);
+            return;
+        }
+
+        if (obj == null) {
+            packer.packNil();
+            return;
+        }
+
+        packer.packInt(ComputeJobType.NATIVE_ID);
+
+        packer.packObjectAsBinaryTuple(obj);
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
new file mode 100644
index 0000000000..0501ce9d29
--- /dev/null
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
+
+import 
org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Unpacks job arguments and results. */
+public final class ClientComputeJobUnpacker {
+    /**
+     * Unpacks compute job argument. If the marshaller is provided, it will be 
used to unmarshal the argument. If the marshaller is not
+     * provided and the argument is a native column type or a tuple, it will 
be unpacked accordingly.
+     *
+     * @param marshaller Marshaller.
+     * @param unpacker Unpacker.
+     * @return Unpacked argument.
+     */
+    public static @Nullable Object unpackJobArgument(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
+        return unpack(marshaller, unpacker);
+    }
+
+    /**
+     * Unpacks compute job result. If the marshaller is provided, it will be 
used to unmarshal the result. If the marshaller is not provided
+     * and the result is a native column type or a tuple, it will be unpacked 
accordingly.
+     *
+     * @param marshaller Marshaller.
+     * @param unpacker Unpacker.
+     * @return Unpacked result.
+     */
+    public static @Nullable Object unpackJobResult(@Nullable Marshaller<?, 
byte[]> marshaller, ClientMessageUnpacker unpacker) {
+        return unpack(marshaller, unpacker);
+    }
+
+    /** Underlying byte array expected to be in the following format: | typeId 
| value |. */
+    private static @Nullable Object unpack(@Nullable Marshaller<?, byte[]> 
marshaller, ClientMessageUnpacker unpacker) {
+        if (unpacker.tryUnpackNil()) {
+            return null;
+        }
+
+        int typeId = unpacker.unpackInt();
+        var type = ComputeJobType.Type.fromId(typeId);
+
+        switch (type) {
+            case NATIVE:
+                if (marshaller != null) {
+                    throw new UnmarshallingException(
+                            "Can not unpack object because the marshaller is 
provided but the object was packed without marshaller."
+                    );
+                }
+
+                return unpacker.unpackObjectFromBinaryTuple();
+            case MARSHALLED_TUPLE:
+                return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
+
+            case MARSHALLED_OBJECT:
+                if (marshaller == null) {
+                    throw new UnmarshallingException(
+                            "Can not unpack object because the marshaller is 
not provided but the object was packed with marshaller."
+                    );
+                }
+                return tryUnmarshalOrCast(marshaller, unpacker.readBinary());
+
+            default:
+                throw new UnmarshallingException("Unsupported compute job type 
id: " + typeId);
+        }
+    }
+
+    /** Unpacks compute job argument without marshaller. */
+    public static Object 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+        if (unpacker.tryUnpackNil()) {
+            return null;
+        }
+
+        int typeId = unpacker.unpackInt();
+        var type = ComputeJobType.Type.fromId(typeId);
+
+        switch (type) {
+            case NATIVE:
+                return unpacker.unpackObjectFromBinaryTuple();
+            case MARSHALLED_TUPLE:
+                return 
TupleWithSchemaMarshalling.unmarshal(unpacker.readBinary());
+            case MARSHALLED_OBJECT:
+                return unpacker.readBinary();
+            default:
+                throw new UnmarshallingException("Unsupported compute job type 
id: " + typeId);
+        }
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 3a93bb98e4..deb3be15f2 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -30,13 +30,11 @@ import java.util.UUID;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleParser;
-import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.sql.BatchedArguments;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * ByteBuf-based MsgPack implementation. Replaces {@link 
org.msgpack.core.MessagePacker} to avoid
- * extra buffers and indirection.
+ * ByteBuf-based MsgPack implementation. Replaces {@link 
org.msgpack.core.MessagePacker} to avoid extra buffers and indirection.
  *
  * <p>Releases wrapped buffer on {@link #close()} .
  */
@@ -379,7 +377,7 @@ public class ClientMessagePacker implements AutoCloseable {
      *
      * <p>Should be followed by {@link #writePayload(byte[])} method to write 
the extension body.
      *
-     * @param extType    the extension type tag to be written.
+     * @param extType the extension type tag to be written.
      * @param payloadLen number of bytes of a payload binary to be written.
      */
     public void packExtensionTypeHeader(byte extType, int payloadLen) {
@@ -599,7 +597,7 @@ public class ClientMessagePacker implements AutoCloseable {
      *
      * @param vals Object array.
      */
-    public void packObjectArrayAsBinaryTuple(Object @Nullable [] vals, 
@Nullable Marshaller<Object, byte[]> marshaller) {
+    public void packObjectArrayAsBinaryTuple(Object @Nullable [] vals) {
         assert !closed : "Packer is closed";
 
         if (vals == null) {
@@ -619,7 +617,7 @@ public class ClientMessagePacker implements AutoCloseable {
         var builder = new BinaryTupleBuilder(vals.length * 3);
 
         for (Object arg : vals) {
-            ClientBinaryTupleUtils.appendObject(builder, arg, marshaller);
+            ClientBinaryTupleUtils.appendObject(builder, arg);
         }
 
         packBinaryTuple(builder);
@@ -651,7 +649,7 @@ public class ClientMessagePacker implements AutoCloseable {
             var builder = new BinaryTupleBuilder(rowLen * 3);
 
             for (Object value : values) {
-                ClientBinaryTupleUtils.appendObject(builder, value, null);
+                ClientBinaryTupleUtils.appendObject(builder, value);
             }
 
             packBinaryTuple(builder);
@@ -663,7 +661,7 @@ public class ClientMessagePacker implements AutoCloseable {
      *
      * @param val Object array.
      */
-    public <T> void packObjectAsBinaryTuple(@Nullable T val, @Nullable 
Marshaller<T, byte[]> marshaller) {
+    public <T> void packObjectAsBinaryTuple(@Nullable T val) {
         assert !closed : "Packer is closed";
 
         if (val == null) {
@@ -675,7 +673,7 @@ public class ClientMessagePacker implements AutoCloseable {
         // Builder with inline schema.
         // Value is represented by 3 tuple elements: type, scale, value.
         var builder = new BinaryTupleBuilder(3, 3, false);
-        ClientBinaryTupleUtils.appendObject(builder, val, marshaller);
+        ClientBinaryTupleUtils.appendObject(builder, val);
 
         packBinaryTuple(builder);
     }
@@ -718,6 +716,18 @@ public class ClientMessagePacker implements AutoCloseable {
         packByteBuffer(buf);
     }
 
+    /**
+     * Pack binary.
+     *
+     * @param buf Byte array.
+     */
+    public void packBinary(byte[] buf) {
+        assert !closed : "Packer is closed";
+
+        packBinaryHeader(buf.length);
+        writePayload(buf);
+    }
+
     /**
      * Pack ByteBuffer.
      *
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
new file mode 100644
index 0000000000..4cd5847af1
--- /dev/null
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ComputeJobType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * The type of the object that can be passed/returned to/from the compute job. 
In can be a native type that is represented by
+ * {@link ColumnType} or a marshalled object/tuple.
+ */
+public class ComputeJobType {
+    static final int NATIVE_ID = 0;
+    static final int MARSHALLED_TUPLE_ID = 1;
+    static final int MARSHALLED_OBJECT_ID = 2;
+
+    /**
+     * [0, .., Integer.MAX_VALUE] - native types. The id is the same as in 
{@link ColumnType}. (0, .., Integer.MIN_VALUE] - marshalled
+     * types.
+     */
+    private final int id;
+    private final Type type;
+
+    ComputeJobType(int id) {
+        this.id = id;
+        this.type = Type.fromId(id);
+    }
+
+    /** Return the id of the type. */
+    public int id() {
+        return id;
+    }
+
+    /** Return the type of the object. */
+    public Type type() {
+        return type;
+    }
+
+    /** The type of the object. */
+    public enum Type {
+        MARSHALLED_TUPLE, MARSHALLED_OBJECT, NATIVE;
+
+        static Type fromId(int id) {
+            if (id == MARSHALLED_TUPLE_ID) {
+                return MARSHALLED_TUPLE;
+            }
+
+            if (id == MARSHALLED_OBJECT_ID) {
+                return MARSHALLED_OBJECT;
+            }
+
+            if (id == NATIVE_ID) {
+                return NATIVE;
+            }
+
+            throw new IllegalArgumentException("Unsupported type id: " + id);
+        }
+    }
+}
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
index 3701c6ff6c..2541816c67 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/StreamerReceiverSerializer.java
@@ -55,7 +55,7 @@ public class StreamerReceiverSerializer {
         var builder = new BinaryTupleBuilder(binaryTupleSize);
         builder.appendString(receiverClassName);
 
-        ClientBinaryTupleUtils.appendObject(builder, receiverArgs, 
receiverArgsMarshaller);
+        ClientBinaryTupleUtils.appendObject(builder, receiverArgs);
 
         ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, items);
 
@@ -79,7 +79,7 @@ public class StreamerReceiverSerializer {
         var builder = new BinaryTupleBuilder(binaryTupleSize);
         builder.appendString(receiver.receiverClassName());
 
-        ClientBinaryTupleUtils.appendObject(builder, receiverArgs, 
receiver.argumentMarshaller());
+        ClientBinaryTupleUtils.appendObject(builder, receiverArgs);
 
         ClientBinaryTupleUtils.appendCollectionToBinaryTuple(builder, items);
 
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
index 5aa27342f2..46f5509ff8 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcBatchPreparedStmntRequest.java
@@ -132,7 +132,7 @@ public class JdbcBatchPreparedStmntRequest implements 
ClientMessage {
         packer.packInt(args.size());
 
         for (Object[] arg : args) {
-            packer.packObjectArrayAsBinaryTuple(arg, null);
+            packer.packObjectArrayAsBinaryTuple(arg);
         }
 
         packer.packLong(queryTimeoutMillis);
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
index 56159b5c94..99eb4545a7 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/event/JdbcQueryExecuteRequest.java
@@ -188,7 +188,7 @@ public class JdbcQueryExecuteRequest implements 
ClientMessage {
         packer.packString(sqlQry);
         packer.packBoolean(multiStatement);
 
-        packer.packObjectArrayAsBinaryTuple(args, null);
+        packer.packObjectArrayAsBinaryTuple(args);
         packer.packLong(queryTimeoutMillis);
     }
 
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
new file mode 100644
index 0000000000..bd64cf933e
--- /dev/null
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
+import java.time.temporal.ChronoUnit;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class ClientComputeJobPackerUnpackerTest {
+    private static Stream<Arguments> nativeTypes() {
+        return Stream.of(
+                (byte) 4, (short) 8, 15, 16L, 23.0f, 42.0d, "TEST_STRING", 
null, UUID.randomUUID(),
+                LocalTime.now(), LocalDate.now(), LocalDateTime.now(), 
Instant.now(), Period.of(1, 2, 3),
+                Duration.of(1, ChronoUnit.DAYS)
+        ).map(Arguments::of);
+    }
+
+    private static Stream<Arguments> tuples() {
+        return Stream.of(
+                null,
+                Tuple.create(),
+                Tuple.create().set("key", 1),
+                Tuple.create().set("key", "value"),
+                Tuple.create().set("col1", null).set("col2", true).set("col3", 
(byte) 1).set("col4", (short) 2).set("col5", 3)
+                        .set("col6", 4L).set("col7", 5.0f).set("col8", 
6.0).set("col9", new BigDecimal("7.11"))
+                        .set("col10", LocalDate.of(2024, 1, 1)).set("col11", 
LocalTime.of(12, 0))
+                        .set("col12", LocalDate.of(2024, 1, 
1).atTime(LocalTime.of(12, 0)))
+                        .set("col13", 
UUID.fromString("123e4567-e89b-12d3-a456-426614174000")).set("col14", "string")
+                        .set("col15", new byte[]{1, 2, 3}).set("col16", 
Period.ofDays(10)).set("col17", Duration.ofDays(10))
+        ).map(Arguments::of);
+    }
+
+    private ClientMessagePacker messagePacker;
+
+    @BeforeEach
+    void setUp() {
+        messagePacker = new 
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer());
+    }
+
+    @AfterEach
+    void tearDown() {
+        messagePacker.close();
+    }
+
+    @MethodSource({"tuples", "nativeTypes"})
+    @ParameterizedTest
+    void packUnpackNoMarshalling_jobArgument(Object arg) {
+        // When pack job argument without marshaller.
+        ClientComputeJobPacker.packJobArgument(arg, null, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job argument without marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            var res = ClientComputeJobUnpacker.unpackJobArgument(null, 
messageUnpacker);
+
+            // Then.
+            assertEquals(arg, res);
+        }
+    }
+
+    private ClientMessageUnpacker messageUnpacker(byte[] data) {
+        return new ClientMessageUnpacker(Unpooled.wrappedBuffer(data, 4, 
data.length - 4));
+    }
+
+    @MethodSource({"tuples", "nativeTypes"})
+    @ParameterizedTest
+    void packUnpackNoMarshalling_jobResult(Object arg) {
+        // When pack job result without marshaller.
+        ClientComputeJobPacker.packJobResult(arg, null, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job result without marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            var res = ClientComputeJobUnpacker.unpackJobResult(null, 
messageUnpacker);
+
+            // Then.
+            assertEquals(arg, res);
+        }
+    }
+
+    @Test
+    void marshallingPackUnpack_jobResult() {
+        // Given.
+        Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+        var str = "Hi, marshal me!";
+
+        // When pack job result with marshaller.
+        ClientComputeJobPacker.packJobResult(str, marshaller, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job result with marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            Object res = ClientComputeJobUnpacker.unpackJobResult(marshaller, 
messageUnpacker);
+
+            // Then.
+            assertEquals(str, res);
+        }
+    }
+
+    @Test
+    void marshallingPackUnpack_jobArgument() {
+        // Given.
+        Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+        var str = "Hi, marshal me!";
+
+        // When pack job argument with marshaller.
+        ClientComputeJobPacker.packJobArgument(str, marshaller, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job argument with marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            Object res = 
ClientComputeJobUnpacker.unpackJobArgument(marshaller, messageUnpacker);
+
+            // Then.
+            assertEquals(str, res);
+        }
+    }
+
+    @Test
+    void packWithMarshallerUnpackWithout_jobResult() {
+        // Given.
+        Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+        var str = "Hi, marshal me!";
+
+        // When pack job result with marshaller.
+        ClientComputeJobPacker.packJobResult(str, marshaller, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job result without marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            // Then the exception is thrown because it is not allowed unpack 
the marshalled object without marshaller.
+            assertThrows(
+                    UnmarshallingException.class,
+                    () -> ClientComputeJobUnpacker.unpackJobResult(null, 
messageUnpacker)
+            );
+        }
+    }
+
+    @Test
+    void packWithMarshallerUnpackWithout_jobArgument() {
+        // Given.
+        Marshaller<String, byte[]> marshaller = new TestStringMarshaller();
+        var str = "Hi, marshal me!";
+
+        // When pack job argument with marshaller.
+        ClientComputeJobPacker.packJobArgument(str, marshaller, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job argument without marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            // Then the exception is thrown because it is not allowed unpack 
the marshalled object without marshaller.
+            assertThrows(
+                    UnmarshallingException.class,
+                    () -> ClientComputeJobUnpacker.unpackJobArgument(null, 
messageUnpacker)
+            );
+        }
+    }
+
+    @Test
+    void packByteArrayUnpackStringWithMarshaller_jobResult() {
+        // Given.
+        var str = "Hi, marshal me!";
+        var bytes = str.getBytes();
+
+        // When pack job result without marshaller.
+        ClientComputeJobPacker.packJobResult(bytes, null, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job result with marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            // Then the exception is thrown because it is not allowed to 
define the marshaller only for the result.
+            assertThrows(
+                    UnmarshallingException.class,
+                    () -> ClientComputeJobUnpacker.unpackJobResult(new 
TestStringMarshaller(), messageUnpacker)
+            );
+        }
+    }
+
+    @Test
+    void packByteArrayUnpackStringWithMarshaller_jobArgument() {
+        // Given.
+        var str = "Hi, marshal me!";
+        byte[] bytes = str.getBytes();
+
+        // When pack job argument without marshaller.
+        ClientComputeJobPacker.packJobArgument(bytes, null, messagePacker);
+        byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
+
+        // And unpack job argument with marshaller.
+        try (var messageUnpacker = messageUnpacker(data)) {
+            // Then the exception is thrown because it is not allowed to 
define the marshaller only for the result.
+            assertThrows(
+                    UnmarshallingException.class,
+                    () -> ClientComputeJobUnpacker.unpackJobArgument(new 
TestStringMarshaller(), messageUnpacker)
+            );
+        }
+    }
+
+    private static class TestStringMarshaller implements Marshaller<String, 
byte[]> {
+        @Override
+        public byte[] marshal(String obj) {
+            return obj.getBytes();
+        }
+
+        @Override
+        public String unmarshal(byte[] bytes) {
+            return new String(bytes);
+        }
+    }
+}
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
index be9301c4a3..9a9ccd497d 100644
--- 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientMessagePackerUnpackerTest.java
@@ -162,9 +162,9 @@ public class ClientMessagePackerUnpackerTest {
     @Test
     public void testObjectArrayAsBinaryTuple() {
         try (var packer = new 
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
-            packer.packObjectArrayAsBinaryTuple(argsAllTypes, null);
-            packer.packObjectArrayAsBinaryTuple(null, null);
-            packer.packObjectArrayAsBinaryTuple(new Object[0], null);
+            packer.packObjectArrayAsBinaryTuple(argsAllTypes);
+            packer.packObjectArrayAsBinaryTuple(null);
+            packer.packObjectArrayAsBinaryTuple(new Object[0]);
 
             byte[] data = ByteBufUtil.getBytes(packer.getBuffer());
 
@@ -184,7 +184,7 @@ public class ClientMessagePackerUnpackerTest {
     public void testObjectAsBinaryTuple() {
         try (var packer = new 
ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
             for (Object arg : argsAllTypes) {
-                packer.packObjectAsBinaryTuple(arg, null);
+                packer.packObjectAsBinaryTuple(arg);
             }
 
             byte[] data = ByteBufUtil.getBytes(packer.getBuffer());
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
new file mode 100644
index 0000000000..ed73399710
--- /dev/null
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ComputeJobTypeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.stream.Stream;
+import org.apache.ignite.internal.client.proto.ComputeJobType.Type;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@TestInstance(Lifecycle.PER_CLASS)
+class ComputeJobTypeTest {
+    private static Stream<Arguments> marshalledTypes() {
+        return Stream.of(
+                Arguments.of(0, Type.NATIVE, 0),
+                Arguments.of(1, Type.MARSHALLED_TUPLE, 1),
+                Arguments.of(2, Type.MARSHALLED_OBJECT, 2)
+        );
+    }
+
+    @MethodSource({"marshalledTypes"})
+    @ParameterizedTest
+    void supportedTypes(int givenId, ComputeJobType.Type expectedType, int 
expectedId) {
+        ComputeJobType computeJobType = new ComputeJobType(givenId);
+
+        assertEquals(expectedType, computeJobType.type());
+        assertEquals(expectedId, computeJobType.id());
+    }
+
+    @ParameterizedTest
+    @MethodSource("unsupportedTypesArgs")
+    void unsupportedTypes(int givenId) {
+        IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class, () -> new ComputeJobType(givenId));
+        assertEquals("Unsupported type id: " + givenId, e.getMessage());
+    }
+
+    private Stream<Arguments> unsupportedTypesArgs() {
+        return Stream.of(Integer.MIN_VALUE, 
Integer.MAX_VALUE).map(Arguments::of);
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index a3eccdf9e9..2fe1a7a237 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -69,8 +69,6 @@ public class ClientComputeExecuteColocatedRequest {
                     deploymentUnits,
                     jobClassName,
                     options,
-                    null,
-                    null,
                     args);
 
             var jobExecution = compute.wrapJobExecutionFuture(jobExecutionFut);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 7869f3b3f0..b7bf2d4090 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -86,7 +86,7 @@ public class ClientComputeExecuteMapReduceRequest {
                 execution.stateAsync().whenComplete((state, errState) ->
                         execution.statesAsync().whenComplete((states, 
errStates) ->
                                 notificationSender.sendNotification(w -> {
-                                    w.packObjectAsBinaryTuple(val, null);
+                                    w.packObjectAsBinaryTuple(val);
                                     packTaskState(w, state);
                                     packJobStates(w, states);
                                 }, firstNotNull(err, errState, errStates)))
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 7ee2a78d0c..4307b8c5a3 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -28,11 +28,16 @@ import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.NodeNotFoundException;
 import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
+import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Compute execute request.
@@ -63,7 +68,7 @@ public class ClientComputeExecuteRequest {
         Object arg = unpackPayload(in);
 
         JobExecution<Object> execution = compute.executeAsyncWithFailover(
-                candidates, deploymentUnits, jobClassName, options, null, 
null, arg
+                candidates, deploymentUnits, jobClassName, options,  arg
         );
         sendResultAndState(execution, notificationSender);
 
@@ -104,18 +109,27 @@ public class ClientComputeExecuteRequest {
         return execution.resultAsync().whenComplete((val, err) ->
                 execution.stateAsync().whenComplete((state, errState) ->
                         notificationSender.sendNotification(w -> {
-                            w.packObjectAsBinaryTuple(val, null);
+                            Marshaller<Object, byte[]> marshaller = 
extractMarshaller(execution);
+                            ClientComputeJobPacker.packJobResult(val, 
marshaller, w);
                             packJobState(w, state);
                         }, err)));
     }
 
+    private static <T> @Nullable Marshaller<T, byte[]> 
extractMarshaller(JobExecution<T> e) {
+        if (e instanceof MarshallerProvider) {
+            return ((MarshallerProvider<T>) e).resultMarshaller();
+        }
+
+        return null;
+    }
+
     /**
      * Unpacks args.
      *
      * @param in Unpacker.
      * @return Args array.
      */
-    static Object unpackPayload(ClientMessageUnpacker in) {
-        return in.unpackObjectFromBinaryTuple();
+    static @Nullable Object unpackPayload(ClientMessageUnpacker in) {
+        return ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(in);
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 36d7b49196..9396584a90 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -48,6 +48,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.TuplePart;
@@ -421,7 +422,7 @@ public class ClientCompute implements IgniteCompute {
         w.packString(jobClassName);
         w.packInt(options.priority());
         w.packInt(options.maxRetries());
-        w.packObjectAsBinaryTuple(args, marshaller);
+        ClientComputeJobPacker.packJobArgument(args, marshaller, w);
     }
 
     private static void packTask(ClientMessagePacker w,
@@ -431,7 +432,7 @@ public class ClientCompute implements IgniteCompute {
             @Nullable Marshaller<Object, byte[]> marshaller) {
         w.packDeploymentUnits(units);
         w.packString(taskClassName);
-        w.packObjectAsBinaryTuple(arg, marshaller);
+        ClientComputeJobPacker.packJobArgument(arg, marshaller, w);
     }
 
     /**
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index f086c311e8..d31b52a9aa 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -29,6 +29,7 @@ import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.compute.JobStateImpl;
@@ -65,11 +66,9 @@ class ClientJobExecution<R> implements JobExecution<R> {
                 .thenApply(r -> {
                     // Notifications require explicit input close.
                     try (r) {
-                        Object o = r.in().unpackObjectFromBinaryTuple();
-                        R result = marshaller != null ? 
marshaller.unmarshal((byte[]) o) : (R) o;
-
+                        Object result = 
ClientComputeJobUnpacker.unpackJobResult(marshaller, r.in());
                         stateFuture.complete(unpackJobState(r));
-                        return result;
+                        return (R) result;
                     }
                 });
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index b1c1144b26..437b945c4d 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -232,7 +232,7 @@ public class ClientSql implements IgniteSql {
 
             w.out().packString(statement.query());
 
-            w.out().packObjectArrayAsBinaryTuple(arguments, null);
+            w.out().packObjectArrayAsBinaryTuple(arguments);
 
             w.out().packLong(ch.observableTimestamp());
         };
@@ -309,7 +309,7 @@ public class ClientSql implements IgniteSql {
             packProperties(w, null);
 
             w.out().packString(query);
-            w.out().packObjectArrayAsBinaryTuple(arguments, null);
+            w.out().packObjectArrayAsBinaryTuple(arguments);
             w.out().packLong(ch.observableTimestamp());
         };
 
@@ -331,7 +331,7 @@ public class ClientSql implements IgniteSql {
         if (statementProps != null) {
             for (Entry<String, Object> entry : statementProps.entrySet()) {
                 builder.appendString(entry.getKey());
-                ClientBinaryTupleUtils.appendObject(builder, entry.getValue(), 
null);
+                ClientBinaryTupleUtils.appendObject(builder, entry.getValue());
             }
         }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 9fc69ca097..2a323a24b8 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.compute.ComputeUtils;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
 import org.apache.ignite.internal.compute.JobStateImpl;
+import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.TaskStateImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -95,8 +96,6 @@ public class FakeCompute implements IgniteComputeInternal {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
             Object args) {
         if (Objects.equals(jobClassName, GET_UNITS)) {
             String unitString = 
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
@@ -135,8 +134,6 @@ public class FakeCompute implements IgniteComputeInternal {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
             Object args
     ) {
         return completedFuture(jobExecution(future != null ? future : 
completedFuture((R) nodeName)));
@@ -147,7 +144,7 @@ public class FakeCompute implements IgniteComputeInternal {
         if (target instanceof AnyNodeJobTarget) {
             Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
             return executeAsyncWithFailover(
-                    nodes, descriptor.units(), descriptor.jobClassName(), 
descriptor.options(), null, null, args
+                    nodes, descriptor.units(), descriptor.jobClassName(), 
descriptor.options(), args
             );
         } else if (target instanceof ColocatedJobTarget) {
             return jobExecution(future != null ? future : completedFuture((R) 
nodeName));
@@ -200,27 +197,43 @@ public class FakeCompute implements IgniteComputeInternal 
{
             JobState newState = 
JobStateImpl.toBuilder(state).status(status).finishTime(Instant.now()).build();
             jobStates.put(jobId, newState);
         });
-        return new JobExecution<>() {
-            @Override
-            public CompletableFuture<R> resultAsync() {
-                return result;
-            }
+        return new FakeJobExecution<>(result, jobId);
+    }
 
-            @Override
-            public CompletableFuture<@Nullable JobState> stateAsync() {
-                return completedFuture(jobStates.get(jobId));
-            }
+    private class FakeJobExecution<R> implements JobExecution<R>, 
MarshallerProvider<R> {
+        private final CompletableFuture<R> result;
+        private final UUID jobId;
 
-            @Override
-            public CompletableFuture<@Nullable Boolean> cancelAsync() {
-                return trueCompletedFuture();
-            }
+        private FakeJobExecution(CompletableFuture<R> result, UUID jobId) {
+            this.result = result;
+            this.jobId = jobId;
+        }
 
-            @Override
-            public CompletableFuture<@Nullable Boolean> 
changePriorityAsync(int newPriority) {
-                return trueCompletedFuture();
-            }
-        };
+        @Override
+        public CompletableFuture<R> resultAsync() {
+            return result;
+        }
+
+        @Override
+        public CompletableFuture<@Nullable JobState> stateAsync() {
+            return completedFuture(jobStates.get(jobId));
+        }
+
+        @Override
+        public CompletableFuture<@Nullable Boolean> cancelAsync() {
+            return trueCompletedFuture();
+        }
+
+        @Override
+        public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
+            return trueCompletedFuture();
+        }
+
+
+        @Override
+        public Marshaller<R, byte[]> resultMarshaller() {
+            return null;
+        }
     }
 
     private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
index 3002b83ab0..37769e6715 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.compute;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -28,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-class DelegatingJobExecution<R> implements JobExecution<R> {
+class DelegatingJobExecution<R> implements JobExecution<R>, 
MarshallerProvider<R> {
     private final CompletableFuture<JobExecutionInternal<R>> delegate;
 
     DelegatingJobExecution(CompletableFuture<JobExecutionInternal<R>> 
delegate) {
@@ -54,4 +56,13 @@ class DelegatingJobExecution<R> implements JobExecution<R> {
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
         return delegate.thenApply(jobExecutionInternal -> 
jobExecutionInternal.changePriority(newPriority));
     }
+
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        try {
+            return 
delegate.thenApply(JobExecutionInternal::resultMarshaller).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index e8628ef804..a0932444cb 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.JobState;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -92,6 +93,12 @@ public class ExecutionManager {
     public CompletableFuture<?> resultAsync(UUID jobId) {
         JobExecution<?> execution = executions.get(jobId);
         if (execution != null) {
+            if (execution instanceof MarshallerProvider) {
+                Marshaller<Object, byte[]> marshaller = ((MarshallerProvider) 
execution).resultMarshaller();
+                if (marshaller != null) {
+                    return 
execution.resultAsync().thenApply(marshaller::marshal);
+                }
+            }
             return execution.resultAsync();
         }
         return failedFuture(new ComputeException(RESULT_NOT_FOUND_ERR, "Job 
result not found for the job with ID: " + jobId));
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index 1048c147e3..26cc364721 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -27,6 +27,7 @@ import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -40,7 +41,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <T> the type of the job result.
  */
-class FailSafeJobExecution<T> implements JobExecution<T> {
+class FailSafeJobExecution<T> implements JobExecution<T>, 
MarshallerProvider<T> {
     private static final IgniteLogger LOG = 
Loggers.forClass(FailSafeJobExecution.class);
 
     /**
@@ -179,4 +180,14 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
             throw new IllegalStateException("Job is already completed 
exceptionally.");
         }
     }
+
+    @Override
+    public @Nullable Marshaller<T, byte[]> resultMarshaller() {
+        JobExecution<T> exec = runningJobExecution.get();
+        if (exec instanceof MarshallerProvider) {
+            return ((MarshallerProvider<T>) exec).resultMarshaller();
+        }
+
+        return null;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 5ca872ebc7..3428396b08 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPub
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.apache.ignite.marshalling.Marshaller.tryMarshalOrCast;
 
 import java.util.Collection;
 import java.util.HashSet;
@@ -112,8 +113,8 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         Objects.requireNonNull(target);
         Objects.requireNonNull(descriptor);
 
-        Marshaller<Object, byte[]> argumentMarshaller = (Marshaller<Object, 
byte[]>) descriptor.argumentMarshaller();
-        Marshaller<Object, byte[]> resultMarshaller = (Marshaller<Object, 
byte[]>) descriptor.resultMarshaller();
+        Marshaller<T, byte[]> argumentMarshaller = 
descriptor.argumentMarshaller();
+        Marshaller<R, byte[]> resultMarshaller = descriptor.resultMarshaller();
 
         if (target instanceof AnyNodeJobTarget) {
             Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
@@ -121,16 +122,22 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             if (nodes.size() == 1) {
                 ClusterNode node = nodes.iterator().next();
                 if (node.id().equals(topologyService.localMember().id())) {
-                    return (JobExecution<R>) executeAsyncWithFailover(
-                            nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(),
-                            argumentMarshaller, resultMarshaller, args
+                    return new ResultUnmarshallingJobExecution<>(
+                            executeAsyncWithFailover(
+                                    nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(),
+                                    tryMarshalOrCast(argumentMarshaller, args)
+                            ),
+                            resultMarshaller
                     );
                 }
             }
 
-            return (JobExecution<R>) executeAsyncWithFailover(
-                    nodes, descriptor.units(), descriptor.jobClassName(), 
descriptor.options(),
-                    argumentMarshaller, resultMarshaller, args
+            return new ResultUnmarshallingJobExecution<>(
+                    executeAsyncWithFailover(
+                            nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(),
+                            tryMarshalOrCast(argumentMarshaller, args)
+                    ),
+                    resultMarshaller
             );
         }
 
@@ -144,15 +151,13 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             if (mapper != null) {
                 jobFut = requiredTable(tableName)
                         .thenCompose(table -> 
primaryReplicaForPartitionByMappedKey(table, key, mapper)
-                                .thenApply(primaryNode -> (JobExecution<R>) 
executeOnOneNodeWithFailover(
+                                .thenApply(primaryNode -> 
executeOnOneNodeWithFailover(
                                         primaryNode,
                                         new 
NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, table, 
key, mapper),
                                         descriptor.units(),
                                         descriptor.jobClassName(),
                                         descriptor.options(),
-                                        argumentMarshaller,
-                                        resultMarshaller,
-                                        args
+                                        tryMarshalOrCast(argumentMarshaller, 
args)
                                 )));
 
             } else {
@@ -163,13 +168,11 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                                 descriptor.units(),
                                 descriptor.jobClassName(),
                                 descriptor.options(),
-                                argumentMarshaller,
-                                resultMarshaller,
-                                args))
+                                tryMarshalOrCast(argumentMarshaller, args)))
                         .thenApply(job -> (JobExecution<R>) job);
             }
 
-            return new JobExecutionFutureWrapper<>(jobFut);
+            return new ResultUnmarshallingJobExecution<>(new 
JobExecutionFutureWrapper<>(jobFut), resultMarshaller);
         }
 
         throw new IllegalArgumentException("Unsupported job target: " + 
target);
@@ -186,9 +189,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
-            Object args
+            @Nullable Object args
     ) {
         Set<ClusterNode> candidates = new HashSet<>();
         for (ClusterNode node : nodes) {
@@ -213,8 +214,6 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                         units,
                         jobClassName,
                         options,
-                        argumentMarshaller,
-                        resultMarshaller,
                         args
                 ));
     }
@@ -236,24 +235,15 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions jobExecutionOptions,
-            @Nullable Marshaller<T, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
             @Nullable T payload
     ) {
         ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
-        Object marshalledArg = argumentMarshaller == null ? payload : 
argumentMarshaller.marshal(payload);
 
         if (isLocal(targetNode)) {
-            return new ResultMarshallingJobExecution<>(
-                    computeComponent.executeLocally(options, units, 
jobClassName, marshalledArg),
-                    resultMarshaller
-            );
+            return computeComponent.executeLocally(options, units, 
jobClassName, payload);
         } else {
-            return new ResultMarshallingJobExecution<>(
-                    computeComponent.executeRemotelyWithFailover(
-                            targetNode, nextWorkerSelector, units, 
jobClassName, options, marshalledArg
-                    ),
-                    resultMarshaller
+            return computeComponent.executeRemotelyWithFailover(
+                    targetNode, nextWorkerSelector, units, jobClassName, 
options, payload
             );
         }
     }
@@ -286,14 +276,12 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
-            Object arg) {
+            @Nullable Object arg) {
         return primaryReplicaForPartitionByTupleKey(table, key)
-                .thenApply(primaryNode -> (JobExecution<R>) 
executeOnOneNodeWithFailover(
+                .thenApply(primaryNode -> executeOnOneNodeWithFailover(
                         primaryNode,
                         new NextColocatedWorkerSelector<>(placementDriver, 
topologyService, clock, table, key),
-                        units, jobClassName, options, argumentMarshaller, 
(Marshaller<Object, byte[]>) resultMarshaller, arg
+                        units, jobClassName, options, arg
                 ));
     }
 
@@ -344,6 +332,8 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         Objects.requireNonNull(descriptor);
 
         Marshaller<T, byte[]> argumentMarshaller = 
descriptor.argumentMarshaller();
+        Marshaller<R, byte[]> resultMarshaller = descriptor.resultMarshaller();
+
         return nodes.stream()
                 .collect(toUnmodifiableMap(identity(),
                         // No failover nodes for broadcast. We use failover 
here in order to complete futures with exceptions
@@ -352,10 +342,13 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                             if (topologyService.getByConsistentId(node.name()) 
== null) {
                                 return new FailedExecution<>(new 
NodeNotFoundException(Set.of(node.name())));
                             }
-                            return new JobExecutionWrapper<>((JobExecution<R>) 
executeOnOneNodeWithFailover(
-                                    node, 
CompletableFutures::nullCompletedFuture,
-                                    descriptor.units(), 
descriptor.jobClassName(), descriptor.options(),
-                                    argumentMarshaller, (Marshaller<Object, 
byte[]>) descriptor.resultMarshaller(), args));
+                            return new ResultUnmarshallingJobExecution<>(
+                                    new JobExecutionWrapper<>(
+                                            executeOnOneNodeWithFailover(
+                                                    node, 
CompletableFutures::nullCompletedFuture,
+                                                    descriptor.units(), 
descriptor.jobClassName(),
+                                                    descriptor.options(), 
tryMarshalOrCast(argumentMarshaller, args))),
+                                    resultMarshaller);
                         }));
     }
 
@@ -417,7 +410,6 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 deploymentUnits,
                 StreamerReceiverJob.class.getName(),
                 JobExecutionOptions.DEFAULT,
-                null, null,
                 payload);
 
         return jobExecution.resultAsync()
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index cdc2e12b4f..352b4563d4 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -29,7 +29,6 @@ import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
@@ -47,8 +46,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param options Job execution options.
-     * @param argumentMarshaller Marshaller for the job argument.
-     * @param resultMarshaller Marshaller for the job result.
      * @param payload Arguments of the job.
      * @return CompletableFuture Job result.
      */
@@ -57,9 +54,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
-            Object payload
+            @Nullable Object payload
     );
 
     /**
@@ -71,8 +66,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param options job execution options (priority, max retries).
-     * @param argumentMarshaller Marshaller for the job argument.
-     * @param resultMarshaller Marshaller for the job result.
      * @param payload Arguments of the job.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -83,8 +76,6 @@ public interface IgniteComputeInternal extends IgniteCompute {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            @Nullable Marshaller<Object, byte[]> argumentMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller,
             Object payload);
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
index 4fabbbd62c..5e19c48687 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertT
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-public class JobExecutionWrapper<R> implements JobExecution<R> {
+public class JobExecutionWrapper<R> implements JobExecution<R>, 
MarshallerProvider<R> {
     private final JobExecution<R> delegate;
 
     JobExecutionWrapper(JobExecution<R> delegate) {
@@ -56,4 +57,13 @@ public class JobExecutionWrapper<R> implements 
JobExecution<R> {
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
         return 
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
     }
+
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        if (delegate instanceof MarshallerProvider) {
+            return ((MarshallerProvider<R>) delegate).resultMarshaller();
+        }
+
+        return null;
+    }
 }
diff --git a/modules/binary-tuple/build.gradle 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
similarity index 65%
copy from modules/binary-tuple/build.gradle
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
index 8200416793..c8570dd1bb 100644
--- a/modules/binary-tuple/build.gradle
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/MarshallerProvider.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.compute;
 
-description = 'ignite-binary-tuple'
+import org.apache.ignite.marshalling.Marshaller;
+import org.jetbrains.annotations.Nullable;
 
-dependencies {
-    annotationProcessor project(':ignite-configuration-annotation-processor')
-    implementation project(':ignite-api')
-    implementation project(':ignite-core')
-    implementation libs.jetbrains.annotations
+/**
+ * Marshaller provider.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface MarshallerProvider<R> {
+    /** Returns marshaller or null. */
+    @Nullable Marshaller<R, byte[]> resultMarshaller();
 }
-
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
index 134353f4a3..9c3caa6457 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
@@ -29,25 +29,18 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-class ResultMarshallingJobExecution<R> implements JobExecution<R> {
+class ResultMarshallingJobExecution<R> implements JobExecution<R>, 
MarshallerProvider<R> {
     private final JobExecution<R> delegate;
     private final Marshaller<R, byte[]> resultMarshaller;
 
-    ResultMarshallingJobExecution(JobExecution<R> delegate, Marshaller<R, 
byte[]> resultMarshaller) {
+    ResultMarshallingJobExecution(JobExecution<R> delegate, @Nullable 
Marshaller<R, byte[]> resultMarshaller) {
         this.delegate = delegate;
         this.resultMarshaller = resultMarshaller;
     }
 
     @Override
     public CompletableFuture<R> resultAsync() {
-        return delegate.resultAsync()
-                .thenApply(res -> {
-                    if (resultMarshaller == null) {
-                        return res;
-                    }
-
-                    return resultMarshaller.unmarshal((byte[]) res);
-                });
+        return delegate.resultAsync();
     }
 
     @Override
@@ -64,4 +57,9 @@ class ResultMarshallingJobExecution<R> implements 
JobExecution<R> {
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
         return delegate.changePriorityAsync(newPriority);
     }
+
+    @Override
+    public Marshaller<R, byte[]> resultMarshaller() {
+        return resultMarshaller;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
similarity index 70%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
index 134353f4a3..b6145258a0 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultMarshallingJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
@@ -17,37 +17,31 @@
 
 package org.apache.ignite.internal.compute;
 
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
-import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
 import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Delegates {@link JobExecution} to the future of {@link 
JobExecutionInternal}.
+ * Wraps {@link #resultAsync()} with marshalling.
  *
  * @param <R> Result type.
  */
-class ResultMarshallingJobExecution<R> implements JobExecution<R> {
+class ResultUnmarshallingJobExecution<R> implements JobExecution<R> {
     private final JobExecution<R> delegate;
-    private final Marshaller<R, byte[]> resultMarshaller;
+    private final Marshaller<R, byte[]> resultUnmarshaller;
 
-    ResultMarshallingJobExecution(JobExecution<R> delegate, Marshaller<R, 
byte[]> resultMarshaller) {
+    ResultUnmarshallingJobExecution(JobExecution<R> delegate, @Nullable 
Marshaller<R, byte[]> resultUnmarshaller) {
         this.delegate = delegate;
-        this.resultMarshaller = resultMarshaller;
+        this.resultUnmarshaller = resultUnmarshaller;
     }
 
     @Override
     public CompletableFuture<R> resultAsync() {
-        return delegate.resultAsync()
-                .thenApply(res -> {
-                    if (resultMarshaller == null) {
-                        return res;
-                    }
-
-                    return resultMarshaller.unmarshal((byte[]) res);
-                });
+        return delegate.resultAsync().thenApply(r -> 
tryUnmarshalOrCast(resultUnmarshaller, r));
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 1d7ba15ca8..a6d2216ad2 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -94,37 +94,21 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
         Marshaller<R, byte[]> resultMarshaller = 
jobInstance.resultMarshaller();
 
         QueueExecution<R> execution = executorService.submit(
-                unmarshalExecMarshal(input, jobInstance, context, 
inputMarshaller, resultMarshaller),
+                unmarshalExecMarshal(input, jobInstance, context, 
inputMarshaller),
                 options.priority(),
                 options.maxRetries()
         );
 
-        return new JobExecutionInternal<>(execution, isInterrupted);
+        return new JobExecutionInternal<>(execution, isInterrupted, 
resultMarshaller);
     }
 
     private static <T, R> Callable<CompletableFuture<R>> unmarshalExecMarshal(
             T input,
             ComputeJob<T, R> jobInstance,
             JobExecutionContext context,
-            @Nullable Marshaller<T, byte[]> inputMarshaller,
-            @Nullable Marshaller<R, byte[]> resultMarshaller
+            @Nullable Marshaller<T, byte[]> inputMarshaller
     ) {
-        return () -> {
-            var fut = jobInstance.executeAsync(context, 
unmarshallOrNotIfNull(inputMarshaller, input));
-            if (fut != null) {
-                return (CompletableFuture<R>) fut.thenApply(res -> 
marshallOrNull(res, resultMarshaller));
-            }
-            return null;
-        };
-    }
-
-
-    private static <R> Object marshallOrNull(Object res, @Nullable 
Marshaller<R, byte[]> marshaller) {
-        if (marshaller == null) {
-            return res;
-        }
-
-        return marshaller.marshal((R) res);
+        return () -> jobInstance.executeAsync(context, 
unmarshallOrNotIfNull(inputMarshaller, input));
     }
 
     private static <T> @Nullable T unmarshallOrNotIfNull(@Nullable 
Marshaller<T, byte[]> marshaller, Object input) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index 219171b372..6b8313dbb1 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.compute.executor;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -28,20 +30,23 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Job result type.
  */
-public class JobExecutionInternal<R> {
+public class JobExecutionInternal<R> implements MarshallerProvider<R> {
     private final QueueExecution<R> execution;
 
     private final AtomicBoolean isInterrupted;
 
+    private final Marshaller<R, byte[]> marshaller;
+
     /**
      * Constructor.
      *
      * @param execution Internal execution state.
      * @param isInterrupted Flag which is passed to the execution context so 
that the job can check it for cancellation request.
      */
-    JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean 
isInterrupted) {
+    JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean 
isInterrupted, @Nullable Marshaller<R, byte[]> marshaller) {
         this.execution = execution;
         this.isInterrupted = isInterrupted;
+        this.marshaller = marshaller;
     }
 
     public CompletableFuture<R> resultAsync() {
@@ -72,4 +77,9 @@ public class JobExecutionInternal<R> {
     public boolean changePriority(int newPriority) {
         return execution.changePriority(newPriority);
     }
+
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        return marshaller;
+    }
 }
diff --git 
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index 1de9bc3e4e..2cab70d30c 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -24,24 +24,72 @@
 
 namespace ignite::detail {
 
+/**
+ * Compute Job Type.
+ */
+enum class compute_job_type {
+    /** Native. */
+    NATIVE = 0,
+
+    /** Marshalled Tuple. */
+    MARSHALLED_TUPLE = 1,
+
+    /** Marshalled Object. */
+    MARSHALLED_OBJECT = 2,
+};
+
+
+/**
+ * Get Compute Job Type from Type ID.
+ * @param type_id Type ID.
+ * @return Compute Job Type.
+ */
+compute_job_type from_type_id(ignite_type type_id) {
+    auto type_conv = compute_job_type(type_id);
+
+    if (type_conv == compute_job_type::MARSHALLED_TUPLE || type_conv == 
compute_job_type::MARSHALLED_OBJECT)
+        return type_conv;
+
+    return compute_job_type::NATIVE;
+}
+
+
 /**
  * Write a collection of primitives as a binary tuple.
  *
  * @param writer Writer to use.
  * @param arg Argument.
  */
-void write_object_as_binary_tuple(protocol::writer &writer, const 
binary_object &arg) {
+void write_object_as_binary_tuple(protocol::writer &writer, const primitive 
&arg) {
     binary_tuple_builder args_builder{3};
 
     args_builder.start();
-    protocol::claim_primitive_with_type(args_builder, arg.get_primitive());
+    protocol::claim_primitive_with_type(args_builder, arg);
     args_builder.layout();
-    protocol::append_primitive_with_type(args_builder, arg.get_primitive());
+    protocol::append_primitive_with_type(args_builder, arg);
 
     auto args_data = args_builder.build();
     writer.write_binary(args_data);
 }
 
+/**
+ * Pack compute argument.
+ *
+ * @param writer Writer.
+ * @param arg Argument.
+ */
+void pack_compute_argument(protocol::writer &writer, const binary_object &arg) 
{
+    auto prim = arg.get_primitive();
+    if (prim.is_null()) {
+        writer.write(std::int32_t(ignite_type::NIL));
+        writer.write_nil();
+        return;
+    }
+
+    writer.write(std::int32_t(compute_job_type::NATIVE));
+    write_object_as_binary_tuple(writer, prim);
+}
+
 /**
  * Read primitive from a stream, which is encoded as a binary tuple.
  *
@@ -58,14 +106,17 @@ primitive 
read_primitive_from_binary_tuple(protocol::reader &reader) {
 }
 
 /**
- * Read primitive from a stream, which is encoded as a binary tuple.
+ * Unpack compute execution result.
  *
  * @param reader Reader.
  * @return Value.
  */
-std::optional<primitive> 
read_primitive_from_binary_tuple_nullable(protocol::reader &reader) {
-    if (reader.try_read_nil())
-        return std::nullopt;
+primitive unpack_compute_result(protocol::reader &reader) {
+    auto type_id = ignite_type(reader.read_int32());
+    auto job_id = from_type_id(type_id);
+
+    if (job_id != compute_job_type::NATIVE)
+        throw ignite_error("Only native compute results are supported 
currently");
 
     return read_primitive_from_binary_tuple(reader);
 }
@@ -190,7 +241,7 @@ public:
             job_state state;
 
             auto read_res = result_of_operation<void>([&]() {
-                res = read_primitive_from_binary_tuple_nullable(reader);
+                res = unpack_compute_result(reader);
                 state = read_job_state(reader);
             });
 
@@ -240,7 +291,7 @@ void compute_impl::submit_to_nodes(const 
std::vector<cluster_node> &nodes, std::
         writer.write(descriptor->get_execution_options().get_priority());
         writer.write(descriptor->get_execution_options().get_max_retries());
 
-        write_object_as_binary_tuple(writer, arg);
+        pack_compute_argument(writer, arg);
     };
 
     auto handler = 
std::make_shared<response_handler_compute>(shared_from_this(), 
std::move(callback), false);
@@ -278,7 +329,7 @@ void compute_impl::submit_colocated_async(const std::string 
&table_name, const i
                     
writer.write(descriptor->get_execution_options().get_priority());
                     
writer.write(descriptor->get_execution_options().get_max_retries());
 
-                    write_object_as_binary_tuple(writer, arg);
+                    pack_compute_argument(writer, arg);
                 };
 
                 auto handler = 
std::make_shared<response_handler_compute>(self, std::move(callback), true);
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 26786ecf72..0f1706c64a 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -225,7 +225,8 @@ enum class code : underlying_t {
 
     // Marshalling group. Group code: 22
     COMMON = 0x160001,
-    UNSUPPORTED_OBJECT_TYPE = 0x160002
+    UNSUPPORTED_OBJECT_TYPE = 0x160002,
+    UNMARSHALLING = 0x160003
 };
 
 } // namespace error
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index f88e6bdd6a..4d767591d4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -316,6 +316,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::COMMON:
         case error::code::UNSUPPORTED_OBJECT_TYPE:
         case error::code::MARSHALLING_TYPE_MISMATCH:
+        case error::code::UNMARSHALLING:
             return sql_state::SHY000_GENERAL_ERROR;
     }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index fccce89d2a..bcf3209dbb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Tests
     using Ignite.Sql;
     using Internal.Buffers;
     using Internal.Common;
+    using Internal.Compute;
     using Internal.Network;
     using Internal.Proto;
     using Internal.Proto.BinaryTuple;
@@ -771,6 +772,7 @@ namespace Apache.Ignite.Tests
             var arrayBufferWriter = new PooledArrayBuffer();
             var writer = new MsgPackWriter(arrayBufferWriter);
 
+            writer.Write(ComputePacker.Native); // ComputePacker.Native
             writer.Write(builder.Build().Span);
 
             // Status
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 561b740d93..f5e10723d9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -639,6 +639,9 @@ namespace Apache.Ignite
 
             /// <summary> UnsupportedObjectType error. </summary>
             public const int UnsupportedObjectType = (GroupCode << 16) | (2 & 
0xFFFF);
+
+            /// <summary> Unmarshalling error. </summary>
+            public const int Unmarshalling = (GroupCode << 16) | (3 & 0xFFFF);
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 71335106fd..ed988ff82e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -126,7 +126,7 @@ namespace Apache.Ignite.Internal.Compute
                 WriteUnits(taskDescriptor.DeploymentUnits, writer);
                 w.Write(taskDescriptor.TaskClassName);
 
-                w.WriteObjectAsBinaryTuple(arg);
+                ComputePacker.PackArg(ref w, arg, null);
             }
         }
 
@@ -325,7 +325,7 @@ namespace Apache.Ignite.Internal.Compute
 
             (T, JobState) Read(MsgPackReader reader)
             {
-                var res = (T)reader.ReadObjectFromBinaryTuple(marshaller)!;
+                var res = ComputePacker.UnpackResult(ref reader, marshaller);
                 var status = ReadJobState(reader);
 
                 return (res, status);
@@ -392,7 +392,7 @@ namespace Apache.Ignite.Internal.Compute
                 w.Write(options.Priority);
                 w.Write(options.MaxRetries);
 
-                w.WriteObjectAsBinaryTuple(arg, jobDescriptor.ArgMarshaller);
+                ComputePacker.PackArg(ref w, arg, jobDescriptor.ArgMarshaller);
             }
         }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
new file mode 100644
index 0000000000..9fe5c5d36a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System;
+using System.Buffers;
+using Ignite.Table;
+using Marshalling;
+using Proto.MsgPack;
+
+/// <summary>
+/// Compute packer utils.
+/// </summary>
+internal static class ComputePacker
+{
+    /// <summary>
+    /// Natively supported simple type.
+    /// </summary>
+    internal const int Native = 0;
+
+    /// <summary>
+    /// Ignite tuple.
+    /// </summary>
+    private const int Tuple = 1;
+
+    /// <summary>
+    /// User-defined marshaller.
+    /// </summary>
+    private const int MarshallerObject = 2;
+
+    /// <summary>
+    /// Packs compute job arg.
+    /// </summary>
+    /// <param name="w">Packer.</param>
+    /// <param name="obj">Arg.</param>
+    /// <param name="marshaller">Marshaller.</param>
+    /// <typeparam name="T">Arg type.</typeparam>
+    internal static void PackArg<T>(ref MsgPackWriter w, T obj, 
IMarshaller<T>? marshaller)
+    {
+        if (obj == null)
+        {
+            w.WriteNil();
+            return;
+        }
+
+        if (marshaller != null)
+        {
+            w.Write(MarshallerObject);
+            w.Write(
+                static (IBufferWriter<byte> writer, (T Obj, IMarshaller<T> 
Marshaller) arg) => arg.Marshaller.Marshal(arg.Obj, writer),
+                arg: (obj, marshaller));
+
+            return;
+        }
+
+        if (obj is IIgniteTuple)
+        {
+            // TODO: IGNITE-23033 .NET: Thin 3.0: Support tuples with schemas 
in Compute
+            w.Write(Tuple);
+            throw new NotImplementedException("IGNITE-23033");
+        }
+
+        w.Write(Native);
+        w.WriteObjectAsBinaryTuple(obj);
+    }
+
+    /// <summary>
+    /// Unpacks compute job result.
+    /// </summary>
+    /// <param name="r">Reader.</param>
+    /// <param name="marshaller">Optional marshaller.</param>
+    /// <typeparam name="T">Result type.</typeparam>
+    /// <returns>Result.</returns>
+    internal static T UnpackResult<T>(ref MsgPackReader r, IMarshaller<T>? 
marshaller)
+    {
+        if (r.TryReadNil())
+        {
+            return (T)(object)null!;
+        }
+
+        int type = r.ReadInt32();
+
+        return type switch
+        {
+            Tuple => throw new NotImplementedException("IGNITE-23033"),
+            MarshallerObject => Unmarshal(ref r, marshaller),
+            _ => (T)r.ReadObjectFromBinaryTuple()!
+        };
+
+        static T Unmarshal(ref MsgPackReader r, IMarshaller<T>? marshaller)
+        {
+            if (marshaller == null)
+            {
+                throw new ArgumentNullException(nameof(marshaller), "Compute 
job result marshaller is required but not provided.");
+            }
+
+            if (r.TryReadNil())
+            {
+                return (T)(object)null!;
+            }
+
+            var bytes = r.ReadBinary();
+
+            return marshaller.Unmarshal(bytes);
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
index 6d0ae3d461..b987248d1b 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
@@ -366,40 +366,6 @@ internal ref struct MsgPackReader
         }
     }
 
-    /// <summary>
-    /// Reads <see cref="ColumnType"/> and value with optional marshaller.
-    /// </summary>
-    /// <param name="marshaller">Optional marshaller.</param>
-    /// <returns>Value.</returns>
-    /// <typeparam name="T">Type of the value.</typeparam>
-    public T ReadObjectFromBinaryTuple<T>(IMarshaller<T>? marshaller)
-    {
-        if (TryReadNil())
-        {
-            return (T)(object)null!;
-        }
-
-        if (marshaller == null)
-        {
-            return (T)ReadObjectFromBinaryTuple()!;
-        }
-
-        var tuple = new BinaryTupleReader(ReadBinary(), 3);
-        var type = (ColumnType)tuple.GetInt(0);
-
-        if (type != ColumnType.ByteArray)
-        {
-            throw new UnsupportedObjectTypeMarshallingException(
-                Guid.NewGuid(),
-                ErrorGroups.Marshalling.UnsupportedObjectType,
-                "Unsupported object type. Expected byte[], got " + type);
-        }
-
-        var bytes = tuple.GetBytesSpan(2);
-
-        return marshaller.Unmarshal(bytes);
-    }
-
     /// <summary>
     /// Reads <see cref="ColumnType"/> and value.
     /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 9e92058337..36c1ff5a8f 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -343,6 +343,26 @@ internal readonly ref struct MsgPackWriter
         span.CopyTo(Buf.GetSpanAndAdvance(span.Length));
     }
 
+    /// <summary>
+    /// Appends bytes using <see cref="IBufferWriter{T}"/> directly to the 
underlying buffer, avoiding extra copying.
+    /// </summary>
+    /// <param name="action">Appender action.</param>
+    /// <param name="arg">Argument.</param>
+    /// <typeparam name="TArg">Argument type.</typeparam>
+    public void Write<TArg>(Action<IBufferWriter<byte>, TArg> action, TArg arg)
+    {
+        // Reserve space for size.
+        var sizeSpan = Buf.GetSpanAndAdvance(5);
+        sizeSpan[0] = MsgPackCode.Bin32;
+
+        var startPos = Buf.Position;
+        action(Buf, arg);
+        var length = Buf.Position - startPos;
+
+        // Write size to the reserved space.
+        BinaryPrimitives.WriteUInt32BigEndian(sizeSpan[1..], (uint)length);
+    }
+
     /// <summary>
     /// Writes a transaction.
     /// </summary>
@@ -359,36 +379,6 @@ internal readonly ref struct MsgPackWriter
         }
     }
 
-    /// <summary>
-    /// Writes an object with type code.
-    /// </summary>
-    /// <param name="obj">Object.</param>
-    /// <param name="marshaller">Marshaller.</param>
-    /// <typeparam name="T">Object type.</typeparam>
-    public void WriteObjectAsBinaryTuple<T>(T obj, IMarshaller<T>? marshaller)
-    {
-        if (obj == null)
-        {
-            WriteNil();
-            return;
-        }
-
-        if (marshaller == null)
-        {
-            WriteObjectAsBinaryTuple(obj);
-            return;
-        }
-
-        using var builder = new BinaryTupleBuilder(3);
-        builder.AppendInt((int)ColumnType.ByteArray);
-        builder.AppendInt(0); // Scale.
-        builder.AppendBytes(
-            static (IBufferWriter<byte> writer, (T Obj, IMarshaller<T> 
Marshaller) arg) => arg.Marshaller.Marshal(arg.Obj, writer),
-            arg: (obj, marshaller));
-
-        Write(builder.Build().Span);
-    }
-
     /// <summary>
     /// Writes an object with type code.
     /// </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index a5835951dc..17a13d56d7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -24,11 +24,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.catalog.ColumnType;
 import org.apache.ignite.catalog.definitions.TableDefinition;
 import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
@@ -172,7 +175,7 @@ public class ItThinClientComputeMarshallingTest extends 
ItAbstractThinClientTest
     }
 
     @Test
-    void broadcast() {
+    void executeBroadcast() {
         // When.
         Map<ClusterNode, String> result = client().compute().executeBroadcast(
                 Set.of(node(0), node(1)),
@@ -192,6 +195,39 @@ public class ItThinClientComputeMarshallingTest extends 
ItAbstractThinClientTest
         assertEquals(resultExpected, result);
     }
 
+
+
+    @Test
+    void submitBroadcast() {
+        // When.
+        Map<ClusterNode, String> result = client().compute().submitBroadcast(
+                Set.of(node(0), node(1)),
+                JobDescriptor.builder(ArgumentAndResultMarshallingJob.class)
+                        .argumentMarshaller(new ArgumentStringMarshaller())
+                        .resultMarshaller(new ResultStringUnMarshaller())
+                        .build(),
+                "Input"
+        ).entrySet().stream().collect(
+                Collectors.toMap(Entry::getKey, 
ItThinClientComputeMarshallingTest::extractResult, (v, i) -> v)
+        );
+
+        // Then.
+        Map<ClusterNode, String> resultExpected = Map.of(
+                node(0), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient",
+                node(1), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
+        );
+
+        assertEquals(resultExpected, result);
+    }
+
+    private static String extractResult(Entry<ClusterNode, 
JobExecution<String>> e) {
+        try {
+            return e.getValue().resultAsync().get();
+        } catch (InterruptedException | ExecutionException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
     @Test
     void colocated() {
         // Given entry node.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
index 017063c709..a12003dc67 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.runner.app.Jobs.ResultMarshallingJob;
 import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.marshalling.Marshaller;
+import org.apache.ignite.marshalling.UnmarshallingException;
 import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -81,7 +82,7 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
         );
 
         await().untilAsserted(() -> assertStatusCompleted(result));
-        assertThrows(ClassCastException.class, () -> {
+        assertThrows(UnmarshallingException.class, () -> {
             String str = getSafe(result.resultAsync());
         });
     }
@@ -187,8 +188,8 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
             return fut.get(waitSec, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             var cause = e.getCause();
-            if (cause instanceof ClassCastException) {
-                throw (ClassCastException) cause;
+            if (cause instanceof UnmarshallingException) {
+                throw (UnmarshallingException) cause;
             }
             throw new RuntimeException(e);
         } catch (InterruptedException | TimeoutException e) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
new file mode 100644
index 0000000000..d127c35a93
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTupleComputeMarshallingTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.catalog.definitions.ColumnDefinition.column;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.catalog.ColumnType;
+import org.apache.ignite.catalog.definitions.TableDefinition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test the OOTB support for Tuples in compute api.
+ */
+@SuppressWarnings("resource")
+public class ItThinClientTupleComputeMarshallingTest extends 
ItAbstractThinClientTest {
+    static final String TABLE_NAME = "test";
+    IgniteClient client;
+
+    @BeforeEach
+    void setUp() {
+        client = client();
+
+        client.catalog().createTable(
+                TableDefinition.builder(TABLE_NAME)
+                        .columns(
+                                column("key_col", ColumnType.INT32),
+                                column("value_col", ColumnType.VARCHAR)
+                        ).primaryKey("key_col")
+                        .build()
+        );
+
+        client.tables().table(TABLE_NAME).keyValueView().put(
+                null,
+                Tuple.create().set("key_col", 2),
+                Tuple.create().set("value_col", "hi")
+        );
+    }
+
+    @AfterEach
+    void tearDown() {
+        client.catalog().dropTable(TABLE_NAME);
+    }
+
+    @Test
+    void tupleFromTableApiAsArgument() {
+        // Given tuple from the table.
+        var tup = client.tables().table(TABLE_NAME).keyValueView().get(null, 
Tuple.create().set("key_col", 2));
+
+        // When submit job with the tuple as an argument.
+        JobExecution<String> resultJobExec = client.compute().submit(
+                JobTarget.node(node(1)),
+                JobDescriptor.builder(TupleArgJob.class).build(),
+                tup
+        );
+
+        // Then job completes successfully.
+        assertStatusCompleted(resultJobExec);
+        assertThat(
+                getSafe(resultJobExec.resultAsync()),
+                equalTo("hi")
+        );
+    }
+
+    @Test
+    void tupleFromTableReturned() {
+        // Given.
+        var key = 2;
+
+        // When submit job that returns tuple from the table.
+        JobExecution<Tuple> resultJobExec = client.compute().submit(
+                JobTarget.node(node(1)),
+                JobDescriptor.builder(TupleResultJob.class).build(),
+                key
+        );
+
+        // Then tuple is returned.
+        assertStatusCompleted(resultJobExec);
+        assertThat(
+                getSafe(resultJobExec.resultAsync()).stringValue("value_col"),
+                equalTo("hi")
+        );
+    }
+
+
+    static class TupleResultJob implements ComputeJob<Integer, Tuple> {
+        @Override
+        public @Nullable CompletableFuture<Tuple> 
executeAsync(JobExecutionContext context, @Nullable Integer key) {
+            // todo: There is no table for some reason in context.ignite().
+            return completedFuture(Tuple.create().set("value_col", "hi"));
+        }
+    }
+
+    static class TupleArgJob implements ComputeJob<Tuple, String> {
+        @Override
+        public @Nullable CompletableFuture<String> 
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+            if (arg == null) {
+                return completedFuture("null");
+            }
+
+            return completedFuture(arg.stringValue("value_col"));
+        }
+    }
+
+    private static void assertResultFailsWithErr(int errCode, JobExecution<?> 
result) {
+        var ex = assertThrows(CompletionException.class, () -> 
result.resultAsync().join());
+        assertThat(ex.getCause(), instanceOf(ComputeException.class));
+        assertThat(((ComputeException) ex.getCause()).code(), 
equalTo(errCode));
+    }
+
+    private static void assertStatusFailed(JobExecution<?> result) {
+        var state = getSafe(result.stateAsync());
+        assertThat(state, is(notNullValue()));
+        assertThat(state.status(), equalTo(JobStatus.FAILED));
+    }
+
+    private static void assertStatusCompleted(JobExecution<?> result) {
+        var state = getSafe(result.stateAsync());
+        assertThat(state, is(notNullValue()));
+        assertThat(state.status(), equalTo(JobStatus.COMPLETED));
+    }
+
+    private static <T> T getSafe(@Nullable CompletableFuture<T> fut) {
+        assertThat(fut, is(notNullValue()));
+
+        try {
+            int waitSec = 5;
+            return fut.get(waitSec, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            var cause = e.getCause();
+            if (cause instanceof ClassCastException) {
+                throw (ClassCastException) cause;
+            }
+            throw new RuntimeException(e);
+        } catch (InterruptedException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ClusterNode node(int idx) {
+        return sortedNodes().get(idx);
+    }
+
+    private List<ClusterNode> sortedNodes() {
+        return client().clusterNodes().stream()
+                .sorted(Comparator.comparing(ClusterNode::name))
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
index e344b162a1..2eef341daf 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/compute/ItEmbeddedMarshallingTest.java
@@ -23,11 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.catalog.ColumnType;
 import org.apache.ignite.catalog.definitions.TableDefinition;
 import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
 import 
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJob;
@@ -48,7 +51,7 @@ import org.junit.jupiter.api.Test;
 @SuppressWarnings("resource")
 public class ItEmbeddedMarshallingTest extends ItAbstractThinClientTest {
     @Test
-    void embeddedOk() {
+    void embeddedExecOnAnotherNode() {
         // Given entry node that are not supposed to execute job.
         var node = server(0);
         // And another target node.
@@ -70,7 +73,29 @@ public class ItEmbeddedMarshallingTest extends 
ItAbstractThinClientTest {
     }
 
     @Test
-    void broadcast() {
+    void embeddedExecOnSame() {
+        // Given entry node.
+        var node = server(0);
+        // And target node.
+        var targetNode = node(0);
+
+        // When run job with custom marshaller for pojo argument and result 
but for embedded.
+        var embeddedCompute = node.compute();
+        PojoResult result = embeddedCompute.execute(
+                JobTarget.node(targetNode),
+                JobDescriptor.builder(PojoJob.class)
+                        .argumentMarshaller(new 
JsonMarshaller<>(PojoArg.class))
+                        .resultMarshaller(new 
JsonMarshaller<>(PojoResult.class))
+                        .build(),
+                new PojoArg().setIntValue(2).setStrValue("1")
+        );
+
+        // Then the job returns the expected result.
+        assertEquals(3L, result.getLongValue());
+    }
+
+    @Test
+    void broadcastExecute() {
         // Given entry node.
         var node = server(0);
 
@@ -86,13 +111,49 @@ public class ItEmbeddedMarshallingTest extends 
ItAbstractThinClientTest {
 
         // Then.
         Map<ClusterNode, String> resultExpected = Map.of(
-                node(0), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient",
+                // todo: "https://issues.apache.org/jira/browse/IGNITE-23024";
+                node(0), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer",
+                node(1), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
+        );
+
+        assertEquals(resultExpected, result);
+    }
+
+    @Test
+    void broadcastSubmit() {
+        // Given entry node.
+        var node = server(0);
+
+        // When.
+        Map<ClusterNode, String> result = node.compute().submitBroadcast(
+                Set.of(node(0), node(1)),
+                JobDescriptor.builder(ArgumentAndResultMarshallingJob.class)
+                        .argumentMarshaller(new ArgumentStringMarshaller())
+                        .resultMarshaller(new ResultStringUnMarshaller())
+                        .build(),
+                "Input"
+        ).entrySet().stream().collect(
+                Collectors.toMap(Entry::getKey, 
ItEmbeddedMarshallingTest::extractResult, (v, i) -> v)
+        );
+
+        // Then.
+        Map<ClusterNode, String> resultExpected = Map.of(
+                // todo: "https://issues.apache.org/jira/browse/IGNITE-23024";
+                node(0), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer",
                 node(1), 
"Input:marshalledOnClient:unmarshalledOnServer:processedOnServer:marshalledOnServer:unmarshalledOnClient"
         );
 
         assertEquals(resultExpected, result);
     }
 
+    private static String extractResult(Entry<ClusterNode, 
JobExecution<String>> e) {
+        try {
+            return e.getValue().resultAsync().get();
+        } catch (InterruptedException | ExecutionException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
     @Test
     void colocated() {
         // Given entry node.

Reply via email to