This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 01b2306c26 IGNITE-22787 Support Marshallers in MapReduce API (#4260)
01b2306c26 is described below

commit 01b2306c26cda6d5f40dcd03f186b9da18e90214
Author: Aleksandr Pakhomov <[email protected]>
AuthorDate: Thu Aug 29 21:53:40 2024 +0300

    IGNITE-22787 Support Marshallers in MapReduce API (#4260)
---
 .../org/apache/ignite/compute/TaskDescriptor.java  | 57 +++++++++++++++-
 .../apache/ignite/compute/task/MapReduceTask.java  | 11 ++++
 .../ClientComputeExecuteMapReduceRequest.java      |  9 ++-
 .../internal/client/compute/ClientCompute.java     | 32 +++++----
 .../client/compute/ClientTaskExecution.java        |  6 +-
 .../apache/ignite/client/fakes/FakeCompute.java    | 76 ++++++++++++++--------
 .../internal/compute/TaskExecutionWrapper.java     | 12 +++-
 .../compute/task/DelegatingTaskExecution.java      | 11 +++-
 .../compute/task/TaskExecutionInternal.java        | 16 ++++-
 .../Apache.Ignite/Internal/Compute/Compute.cs      |  3 +-
 .../apache/ignite/internal/runner/app/Jobs.java    | 62 +++++++++++++++++-
 .../client/ItThinClientComputeMarshallingTest.java | 55 ++++++++++++----
 12 files changed, 284 insertions(+), 66 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java 
b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
index 511cd0312f..19e4c1f304 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.marshalling.Marshaller;
 
 /**
  * Compute task descriptor.
@@ -30,12 +31,20 @@ public class TaskDescriptor<T, R> {
 
     private final List<DeploymentUnit> units;
 
+    private final Marshaller<T, byte[]> splitJobArgumentMarshaller;
+
+    private final Marshaller<R, byte[]> reduceJobResultMarshaller;
+
     private TaskDescriptor(
             String taskClassName,
-            List<DeploymentUnit> units
+            List<DeploymentUnit> units,
+            Marshaller<T, byte[]> splitJobArgumentMarshaller,
+            Marshaller<R, byte[]> reduceJobResultMarshaller
     ) {
         this.taskClassName = taskClassName;
         this.units = units;
+        this.splitJobArgumentMarshaller = splitJobArgumentMarshaller;
+        this.reduceJobResultMarshaller = reduceJobResultMarshaller;
     }
 
     /**
@@ -56,6 +65,24 @@ public class TaskDescriptor<T, R> {
         return units;
     }
 
+    /**
+     * Marshaller for split job argument.
+     *
+     * @return Marshaller for split job argument.
+     */
+    public Marshaller<T, byte[]> splitJobArgumentMarshaller() {
+        return splitJobArgumentMarshaller;
+    }
+
+    /**
+     * Marshaller for reduce job result.
+     *
+     * @return Marshaller for reduce job result.
+     */
+    public Marshaller<R, byte[]> reduceJobResultMarshaller() {
+        return reduceJobResultMarshaller;
+    }
+
     /**
      * Create a new builder.
      *
@@ -84,6 +111,8 @@ public class TaskDescriptor<T, R> {
     public static class Builder<T, R> {
         private final String taskClassName;
         private List<DeploymentUnit> units;
+        private Marshaller<T, byte[]> splitJobArgumentMarshaller;
+        private Marshaller<R, byte[]> reduceJobResultMarshaller;
 
         private Builder(String taskClassName) {
             Objects.requireNonNull(taskClassName);
@@ -113,6 +142,27 @@ public class TaskDescriptor<T, R> {
             return this;
         }
 
+        /**
+         * Sets the marshaller for split job argument.
+         *
+         * @param splitJobArgumentMarshaller Marshaller for split job argument.
+         * @return This builder.
+         */
+        public Builder<T, R> splitJobArgumentMarshaller(Marshaller<T, byte[]> 
splitJobArgumentMarshaller) {
+            this.splitJobArgumentMarshaller = splitJobArgumentMarshaller;
+            return this;
+        }
+
+        /**
+         * Sets the marshaller for reduce job result.
+         *
+         * @param reduceJobResultMarshaller Marshaller for reduce job result.
+         * @return This builder.
+         */
+        public Builder<T, R> reduceJobArgumentMarshaller(Marshaller<R, byte[]> 
reduceJobResultMarshaller) {
+            this.reduceJobResultMarshaller = reduceJobResultMarshaller;
+            return this;
+        }
 
         /**
          * Builds the task descriptor.
@@ -122,8 +172,11 @@ public class TaskDescriptor<T, R> {
         public TaskDescriptor<T, R> build() {
             return new TaskDescriptor<>(
                     taskClassName,
-                    units == null ? List.of() : units
+                    units == null ? List.of() : units,
+                    splitJobArgumentMarshaller,
+                    reduceJobResultMarshaller
             );
         }
+
     }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index 69fea09a8b..c824ba6505 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -53,4 +54,14 @@ public interface MapReduceTask<I, M, T, R> {
      * @return Final task result future.
      */
     CompletableFuture<R> reduceAsync(TaskExecutionContext taskContext, 
Map<UUID, T> results);
+
+    /** The marshaller that is called to unmarshal split job argument if not 
null. */
+    default @Nullable Marshaller<I, byte[]> splitJobInputMarshaller() {
+        return null;
+    }
+
+    /** The marshaller that is called to marshal reduce job result if not 
null. */
+    default @Nullable Marshaller<R, byte[]> reduceJobResultMarshaller() {
+        return null;
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index b7bf2d4090..306e5ac1db 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -31,9 +31,12 @@ import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
+import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.marshalling.Marshaller;
 
 /**
  * Compute MapReduce request.
@@ -82,11 +85,13 @@ public class ClientComputeExecuteMapReduceRequest {
     }
 
     static CompletableFuture<Object> sendTaskResult(TaskExecution<Object> 
execution, NotificationSender notificationSender) {
+        TaskExecution<Object> t = execution;
         return execution.resultAsync().whenComplete((val, err) ->
-                execution.stateAsync().whenComplete((state, errState) ->
+                t.stateAsync().whenComplete((state, errState) ->
                         execution.statesAsync().whenComplete((states, 
errStates) ->
                                 notificationSender.sendNotification(w -> {
-                                    w.packObjectAsBinaryTuple(val);
+                                    Marshaller<Object, byte[]> 
resultMarshaller = ((MarshallerProvider<Object>) t).resultMarshaller();
+                                    ClientComputeJobPacker.packJobResult(val, 
resultMarshaller, w);
                                     packTaskState(w, state);
                                     packJobStates(w, states);
                                 }, firstNotNull(err, errState, errStates)))
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 9396584a90..f030f63503 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -122,14 +122,14 @@ public class ClientCompute implements IgniteCompute {
                 return new ClientJobExecution<>(
                         ch,
                         doExecuteColocatedAsync(
-                        colocatedTarget.tableName(),
-                        colocatedTarget.key(),
-                        mapper,
-                        descriptor.units(),
-                        descriptor.jobClassName(),
-                        descriptor.options(),
-                        descriptor.argumentMarshaller(),
-                        arg
+                                colocatedTarget.tableName(),
+                                colocatedTarget.key(),
+                                mapper,
+                                descriptor.units(),
+                                descriptor.jobClassName(),
+                                descriptor.options(),
+                                descriptor.argumentMarshaller(),
+                                arg
                         ),
                         descriptor.resultMarshaller());
             } else {
@@ -228,7 +228,14 @@ public class ClientCompute implements IgniteCompute {
     public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
         Objects.requireNonNull(taskDescriptor);
 
-        return new ClientTaskExecution<>(ch, 
doExecuteMapReduceAsync(taskDescriptor.units(), taskDescriptor.taskClassName(), 
arg, null));
+        return new ClientTaskExecution<>(
+                ch,
+                doExecuteMapReduceAsync(
+                        taskDescriptor.units(), 
taskDescriptor.taskClassName(), arg,
+                        taskDescriptor.splitJobArgumentMarshaller()
+                ),
+                taskDescriptor.reduceJobResultMarshaller()
+        );
     }
 
     @Override
@@ -236,14 +243,15 @@ public class ClientCompute implements IgniteCompute {
         return sync(executeMapReduceAsync(taskDescriptor, arg));
     }
 
-    private <T> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
+    private <T, R> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
             List<DeploymentUnit> units,
             String taskClassName,
             @Nullable T arg,
-            @Nullable Marshaller<Object, byte[]> marshaller) {
+            @Nullable Marshaller<T, byte[]> argumentMarshaller
+    ) {
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
-                w -> packTask(w.out(), units, taskClassName, arg, marshaller),
+                w -> packTask(w.out(), units, taskClassName, arg, 
(Marshaller<Object, byte[]>) argumentMarshaller),
                 ClientCompute::unpackSubmitTaskResult,
                 null,
                 null,
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
index 84b1270557..65e8364296 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -37,6 +37,8 @@ import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,7 +61,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
     // Local states cache
     private final CompletableFuture<List<@Nullable JobState>> statesFutures = 
new CompletableFuture<>();
 
-    ClientTaskExecution(ReliableChannel ch, 
CompletableFuture<SubmitTaskResult> reqFuture) {
+    ClientTaskExecution(ReliableChannel ch, 
CompletableFuture<SubmitTaskResult> reqFuture, Marshaller<R, byte[]> 
resultMarshaller) {
         this.ch = ch;
 
         jobIdFuture = reqFuture.thenApply(SubmitTaskResult::jobId);
@@ -70,7 +72,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
                 .thenApply(payloadInputChannel -> {
                     // Notifications require explicit input close.
                     try (payloadInputChannel) {
-                        R result = (R) 
payloadInputChannel.in().unpackObjectFromBinaryTuple();
+                        R result = (R) 
ClientComputeJobUnpacker.unpackJobResult(resultMarshaller, 
payloadInputChannel.in());
                         
stateFuture.complete(unpackTaskState(payloadInputChannel));
                         
statesFutures.complete(unpackJobStates(payloadInputChannel));
                         return result;
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 2a323a24b8..a37b44c249 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -128,7 +128,7 @@ public class FakeCompute implements IgniteComputeInternal {
 
     /** {@inheritDoc} */
     @Override
-        public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
+    public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
             TableViewInternal table,
             Tuple key,
             List<DeploymentUnit> units,
@@ -261,32 +261,54 @@ public class FakeCompute implements IgniteComputeInternal 
{
             jobStates.put(subJobId2, toState.apply(subJobId2, status));
         });
 
-        return new TaskExecution<>() {
-            @Override
-            public CompletableFuture<R> resultAsync() {
-                return result;
-            }
-
-            @Override
-            public CompletableFuture<@Nullable TaskState> stateAsync() {
-                return 
completedFuture(TaskStateImpl.toBuilder(jobStates.get(jobId)).build());
-            }
-
-            @Override
-            public CompletableFuture<List<@Nullable JobState>> statesAsync() {
-                return completedFuture(List.of(jobStates.get(subJobId1), 
jobStates.get(subJobId2)));
-            }
-
-            @Override
-            public CompletableFuture<@Nullable Boolean> cancelAsync() {
-                return trueCompletedFuture();
-            }
-
-            @Override
-            public CompletableFuture<@Nullable Boolean> 
changePriorityAsync(int newPriority) {
-                return trueCompletedFuture();
-            }
-        };
+        return new FakeTaskExecution<>(result, jobId, subJobId1, subJobId2, 
null);
+
+    }
+
+    class FakeTaskExecution<R> implements TaskExecution<R>, 
MarshallerProvider<R> {
+        private final CompletableFuture<R> result;
+        private final UUID jobId;
+        private final UUID subJobId1;
+        private final UUID subJobId2;
+        private final Marshaller<R, byte[]> marshaller;
+
+        FakeTaskExecution(CompletableFuture<R> result, UUID jobId, UUID 
subJobId1, UUID subJobId2, Marshaller<R, byte[]> marshaller) {
+            this.result = result;
+            this.jobId = jobId;
+            this.subJobId1 = subJobId1;
+            this.subJobId2 = subJobId2;
+            this.marshaller = marshaller;
+        }
+
+        @Override
+        public CompletableFuture<R> resultAsync() {
+            return result;
+        }
+
+        @Override
+        public CompletableFuture<@Nullable TaskState> stateAsync() {
+            return 
completedFuture(TaskStateImpl.toBuilder(jobStates.get(jobId)).build());
+        }
+
+        @Override
+        public CompletableFuture<List<@Nullable JobState>> statesAsync() {
+            return completedFuture(List.of(jobStates.get(subJobId1), 
jobStates.get(subJobId2)));
+        }
+
+        @Override
+        public CompletableFuture<@Nullable Boolean> cancelAsync() {
+            return trueCompletedFuture();
+        }
+
+        @Override
+        public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
+            return trueCompletedFuture();
+        }
+
+        @Override
+        public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+            return marshaller;
+        }
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
index 9c2c3bbe81..90365ed2bf 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -32,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-class TaskExecutionWrapper<R> implements TaskExecution<R> {
+class TaskExecutionWrapper<R> implements TaskExecution<R>, 
MarshallerProvider<R> {
     private final TaskExecution<R> delegate;
 
     TaskExecutionWrapper(TaskExecution<R> delegate) {
@@ -63,4 +64,13 @@ class TaskExecutionWrapper<R> implements TaskExecution<R> {
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
         return 
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
     }
+
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        if (delegate instanceof MarshallerProvider) {
+            return ((MarshallerProvider<R>) delegate).resultMarshaller();
+        }
+
+        return null;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
index fc2c67ad10..fdf35eafb9 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -29,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-public class DelegatingTaskExecution<I, M, T, R> implements TaskExecution<R> {
+public class DelegatingTaskExecution<I, M, T, R> implements TaskExecution<R>, 
MarshallerProvider<R> {
     private final CompletableFuture<TaskExecutionInternal<I, M, T, R>> 
delegate;
 
     public DelegatingTaskExecution(CompletableFuture<TaskExecutionInternal<I, 
M, T, R>> delegate) {
@@ -60,4 +62,11 @@ public class DelegatingTaskExecution<I, M, T, R> implements 
TaskExecution<R> {
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
         return delegate.thenCompose(execution -> 
execution.changePriorityAsync(newPriority));
     }
+
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        assert delegate.isDone() : "Task execution is supposed to be done 
before calling `resultMarshaller()`";
+
+        return delegate.join().resultMarshaller();
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 5ce5ae124e..2a359b3ac8 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
 
 import java.time.Instant;
 import java.util.Arrays;
@@ -50,11 +51,13 @@ import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.TaskStateImpl;
 import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -66,7 +69,7 @@ import org.jetbrains.annotations.Nullable;
  * @param <R> Task result type.
  */
 @SuppressWarnings("unchecked")
-public class TaskExecutionInternal<I, M, T, R> implements TaskExecution<R> {
+public class TaskExecutionInternal<I, M, T, R> implements TaskExecution<R>, 
MarshallerProvider<R> {
     private static final IgniteLogger LOG = 
Loggers.forClass(TaskExecutionInternal.class);
 
     private final QueueExecution<SplitResult<I, M, T, R>> splitExecution;
@@ -81,6 +84,8 @@ public class TaskExecutionInternal<I, M, T, R> implements 
TaskExecution<R> {
 
     private final AtomicBoolean isCancelled;
 
+    private volatile @Nullable Marshaller<R, byte[]> reduceResultMarshallerRef;
+
     /**
      * Construct an execution object and starts executing.
      *
@@ -105,7 +110,9 @@ public class TaskExecutionInternal<I, M, T, R> implements 
TaskExecution<R> {
                 () -> {
                     MapReduceTask<I, M, T, R> task = 
instantiateTask(taskClass);
 
-                    return task.splitAsync(context, arg)
+                    reduceResultMarshallerRef = 
task.reduceJobResultMarshaller();
+
+                    return task.splitAsync(context, 
tryUnmarshalOrCast(task.splitJobInputMarshaller(), arg))
                             .thenApply(jobs -> new SplitResult<>(task, jobs));
                 },
 
@@ -298,6 +305,11 @@ public class TaskExecutionInternal<I, M, T, R> implements 
TaskExecution<R> {
                 .collect(toList());
     }
 
+    @Override
+    public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+        return reduceResultMarshallerRef;
+    }
+
     private static class SplitResult<I, M, T, R> {
         private final MapReduceTask<I, M, T, R> task;
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index ed988ff82e..9b9f9545d5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -358,7 +358,8 @@ namespace Apache.Ignite.Internal.Compute
 
             static (T, TaskState) Read(MsgPackReader reader)
             {
-                var res = (T)reader.ReadObjectFromBinaryTuple()!;
+                // TODO IGNITE-23074 .NET: Thin 3.0: Support marshallers in 
MapReduce
+                var res = ComputePacker.UnpackResult<T>(ref reader, null);
                 var state = ReadTaskState(reader);
 
                 return (res, state);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
index ac205d37a6..14cef44c83 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.task.TaskExecutionContext;
 import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
 
 /** Jobs and marhallers definitions that are used in tests. */
@@ -276,7 +277,7 @@ public class Jobs {
     }
 
     /** MapReduce task that splits input list into two parts and sends them to 
different nodes. */
-    public static class MapReduce implements MapReduceTask<List<String>, 
String, String, String> {
+    public static class MapReduce implements MapReduceTask<List<String>, 
String, String, List<String>> {
         @Override
         public CompletableFuture<List<MapReduceJob<String, String>>> 
splitAsync(
                 TaskExecutionContext taskContext,
@@ -293,17 +294,72 @@ public class Jobs {
                     MapReduceJob.<String, String>builder()
                             .jobDescriptor(mapJobDescriptor)
                             .node(nodes.get(0))
+                            .args(input.get(0))
                             .build(),
                     MapReduceJob.<String, String>builder()
                             .jobDescriptor(mapJobDescriptor)
                             .node(nodes.get(1))
+                            .args(input.get(1))
                             .build()
             ));
         }
 
         @Override
-        public CompletableFuture<String> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
-            return null;
+        public CompletableFuture<List<String>> 
reduceAsync(TaskExecutionContext taskContext, Map<UUID, String> results) {
+            return completedFuture(new ArrayList<>(results.values()));
+        }
+
+        @Override
+        public @Nullable Marshaller<List<String>, byte[]> 
splitJobInputMarshaller() {
+            return ByteArrayMarshaller.create();
+        }
+
+        @Override
+        public @Nullable Marshaller<List<String>, byte[]> 
reduceJobResultMarshaller() {
+            return ByteArrayMarshaller.create();
+        }
+    }
+
+    /** MapReduce that adds a column to the tuple on each step. */
+    public static class MapReduceTuples implements MapReduceTask<Tuple, Tuple, 
Tuple, Tuple> {
+        @Override
+        public CompletableFuture<List<MapReduceJob<Tuple, Tuple>>> 
splitAsync(TaskExecutionContext taskContext, @Nullable Tuple input) {
+            List<ClusterNode> nodes = new 
ArrayList<>(taskContext.ignite().clusterNodes());
+            Tuple jobsInput = Tuple.copy(input);
+            jobsInput.set("split", "call");
+
+            var mapJobDescriptor = 
JobDescriptor.builder(EchoTupleJob.class).build();
+
+            return completedFuture(List.of(
+                    MapReduceJob.<Tuple, Tuple>builder()
+                            .jobDescriptor(mapJobDescriptor)
+                            .node(nodes.get(0))
+                            .args(jobsInput)
+                            .build(),
+                    MapReduceJob.<Tuple, Tuple>builder()
+                            .jobDescriptor(mapJobDescriptor)
+                            .node(nodes.get(1))
+                            .args(jobsInput)
+                            .build()
+            ));
+        }
+
+        @Override
+        public CompletableFuture<Tuple> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Tuple> results) {
+            Tuple reduceResult = Tuple.copy((Tuple) 
results.values().toArray()[0]);
+            reduceResult.set("reduce", "call");
+
+            return completedFuture(reduceResult);
+        }
+    }
+
+    static class EchoTupleJob implements ComputeJob<Tuple, Tuple> {
+        @Override
+        public @Nullable CompletableFuture<Tuple> 
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+            var tuple = Tuple.copy(arg);
+            tuple.set("echo", "echo");
+
+            return completedFuture(tuple);
         }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index 17a13d56d7..512cd659e8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.internal.runner.app.client;
 
 import static org.apache.ignite.catalog.definitions.ColumnDefinition.column;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -39,14 +43,15 @@ import 
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJo
 import org.apache.ignite.internal.runner.app.Jobs.ArgumentStringMarshaller;
 import org.apache.ignite.internal.runner.app.Jobs.JsonMarshaller;
 import org.apache.ignite.internal.runner.app.Jobs.MapReduce;
+import org.apache.ignite.internal.runner.app.Jobs.MapReduceTuples;
 import org.apache.ignite.internal.runner.app.Jobs.PojoArg;
 import org.apache.ignite.internal.runner.app.Jobs.PojoJob;
 import org.apache.ignite.internal.runner.app.Jobs.PojoResult;
 import org.apache.ignite.internal.runner.app.Jobs.ResultMarshallingJob;
 import org.apache.ignite.internal.runner.app.Jobs.ResultStringUnMarshaller;
+import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -196,7 +201,6 @@ public class ItThinClientComputeMarshallingTest extends 
ItAbstractThinClientTest
     }
 
 
-
     @Test
     void submitBroadcast() {
         // When.
@@ -264,20 +268,45 @@ public class ItThinClientComputeMarshallingTest extends 
ItAbstractThinClientTest
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22787";)
     void mapReduce() {
-        // When run job with custom marshaller for string argument.
-        String result = client().compute().executeMapReduce(
-                TaskDescriptor.builder(MapReduce.class).build(),
-                List.of("Input_0", "Input_1"));
+        // When.
+        List<String> result = client().compute().executeMapReduce(
+                TaskDescriptor.builder(MapReduce.class)
+                        
.splitJobArgumentMarshaller(ByteArrayMarshaller.create())
+                        
.reduceJobArgumentMarshaller(ByteArrayMarshaller.create())
+                        .build(),
+                // input_O goes to 0 node and input_1 goes to 1 node
+                List.of("Input_0", "Input_1")
+        );
 
-        // Then both client and server marshaller were called.
-        assertEquals("Input"
-                        + ":marshalledOnClient"
-                        + ":unmarshalledOnServer"
-                        + ":processedOnServer",
-                result
+        // Then.
+        assertThat(
+                result,
+                
hasItem(containsString("Input_0:marshalledOnClient:unmarshalledOnServer:processedOnServer"))
+        );
+        // And
+        assertThat(
+                result,
+                
hasItem(containsString("Input_1:marshalledOnClient:unmarshalledOnServer:processedOnServer"))
+        );
+    }
+
+    @Test
+    void mapReduceTuples() {
+        // When.
+        Tuple result = client().compute().executeMapReduce(
+                TaskDescriptor.builder(MapReduceTuples.class).build(),
+                Tuple.create().set("from", "client")
         );
+
+        // Then.
+        Tuple expectedTuple = Tuple.create();
+        expectedTuple.set("from", "client");
+        expectedTuple.set("split", "call");
+        expectedTuple.set("reduce", "call");
+        expectedTuple.set("echo", "echo");
+
+        assertThat(result, equalTo(expectedTuple));
     }
 
     private ClusterNode node(int idx) {

Reply via email to