This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ff177e75b9 IGNITE-22847 Add TaskDescriptor to Сompute API (#4153)
ff177e75b9 is described below

commit ff177e75b986ef1054d2f746267b083675d0d0f7
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jul 30 11:33:56 2024 +0300

    IGNITE-22847 Add TaskDescriptor to Сompute API (#4153)
    
    Similar to JobDescriptor, add TaskDescriptor for MapReduce method family, 
to keep the API consistent.
---
 .../org/apache/ignite/compute/IgniteCompute.java   |  20 ++--
 .../org/apache/ignite/compute/TaskDescriptor.java  | 129 +++++++++++++++++++++
 .../apache/ignite/compute/task/MapReduceTask.java  |   4 +-
 .../internal/client/proto/ClientMessagePacker.java |   2 +-
 .../ClientComputeExecuteMapReduceRequest.java      |   4 +-
 .../apache/ignite/client/ClientOperationType.java  |   3 +-
 .../internal/client/compute/ClientCompute.java     |  20 ++--
 .../apache/ignite/client/ClientComputeTest.java    |  11 +-
 .../apache/ignite/client/fakes/FakeCompute.java    |   7 +-
 .../ignite/internal/compute/ItComputeBaseTest.java |  17 ++-
 .../ignite/internal/compute/ItMapReduceTest.java   |  24 ++--
 .../internal/compute/AntiHijackIgniteCompute.java  |  12 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |  13 ++-
 .../client/ItThinClientComputeMarshallingTest.java |   5 +-
 .../runner/app/client/ItThinClientComputeTest.java |  16 ++-
 15 files changed, 221 insertions(+), 66 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index f5569d4056..32e3cddd57 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -22,14 +22,12 @@ import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -169,25 +167,23 @@ public interface IgniteCompute {
      *
      * @param <T> Job argument (T)ype.
      * @param <R> Job (R)esult type.
-     * @param units Deployment units.
-     * @param taskClassName Map reduce task class name.
+     * @param taskDescriptor Map reduce task descriptor.
      * @param arg Task argument.
      * @return Task execution interface.
      */
-    <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String 
taskClassName, @Nullable T arg);
+    <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg);
 
     /**
      * Submits a {@link MapReduceTask} of the given class for an execution. A 
shortcut for {@code submitMapReduce(...).resultAsync()}.
      *
      * @param <T> Job argument (T)ype.
      * @param <R> Job (R)esult type.
-     * @param units Deployment units.
-     * @param taskClassName Map reduce task class name.
+     * @param taskDescriptor Map reduce task descriptor.
      * @param arg Task argument.
      * @return Task result future.
      */
-    default <T, R> CompletableFuture<R> 
executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName, 
@Nullable T arg) {
-        return this.<T, R>submitMapReduce(units, taskClassName, 
arg).resultAsync();
+    default <T, R> CompletableFuture<R> 
executeMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, arg).resultAsync();
     }
 
     /**
@@ -195,11 +191,11 @@ public interface IgniteCompute {
      *
      * @param <T> Job argument (T)ype.
      * @param <R> Job (R)esult type.
-     * @param units Deployment units.
-     * @param taskClassName Map reduce task class name.
+     * @param taskDescriptor Map reduce task descriptor.
      * @param arg Task argument.
      * @return Task result.
      * @throws ComputeException If there is any problem executing the task.
      */
-    <T, R> R executeMapReduce(List<DeploymentUnit> units, String 
taskClassName, @Nullable T arg);
+    <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T 
arg);
+
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java 
b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
new file mode 100644
index 0000000000..511cd0312f
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.deployment.DeploymentUnit;
+
+/**
+ * Compute task descriptor.
+ */
+public class TaskDescriptor<T, R> {
+    private final String taskClassName;
+
+    private final List<DeploymentUnit> units;
+
+    private TaskDescriptor(
+            String taskClassName,
+            List<DeploymentUnit> units
+    ) {
+        this.taskClassName = taskClassName;
+        this.units = units;
+    }
+
+    /**
+     * Task class name.
+     *
+     * @return Task class name.
+     */
+    public String taskClassName() {
+        return taskClassName;
+    }
+
+    /**
+     * Deployment units.
+     *
+     * @return Deployment units.
+     */
+    public List<DeploymentUnit> units() {
+        return units;
+    }
+
+    /**
+     * Create a new builder.
+     *
+     * @return Task descriptor builder.
+     */
+    public static <T, R> Builder<T, R> builder(String taskClassName) {
+        Objects.requireNonNull(taskClassName);
+
+        return new Builder<>(taskClassName);
+    }
+
+    /**
+     * Create a new builder.
+     *
+     * @return Task descriptor builder.
+     */
+    public static <I, M, T, R> Builder<I, R> builder(Class<? extends 
MapReduceTask<I, M, T, R>> taskClass) {
+        Objects.requireNonNull(taskClass);
+
+        return new Builder<>(taskClass.getName());
+    }
+
+    /**
+     * Builder.
+     */
+    public static class Builder<T, R> {
+        private final String taskClassName;
+        private List<DeploymentUnit> units;
+
+        private Builder(String taskClassName) {
+            Objects.requireNonNull(taskClassName);
+
+            this.taskClassName = taskClassName;
+        }
+
+        /**
+         * Sets the deployment units.
+         *
+         * @param units Deployment units.
+         * @return This builder.
+         */
+        public Builder<T, R> units(List<DeploymentUnit> units) {
+            this.units = units;
+            return this;
+        }
+
+        /**
+         * Sets the deployment units.
+         *
+         * @param units Deployment units.
+         * @return This builder.
+         */
+        public Builder<T, R> units(DeploymentUnit... units) {
+            this.units = List.of(units);
+            return this;
+        }
+
+
+        /**
+         * Builds the task descriptor.
+         *
+         * @return Task descriptor.
+         */
+        public TaskDescriptor<T, R> build() {
+            return new TaskDescriptor<>(
+                    taskClassName,
+                    units == null ? List.of() : units
+            );
+        }
+    }
+}
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index 1f379ac030..69fea09a8b 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -21,12 +21,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * A map reduce task interface. Implement this interface and pass a name of 
the implemented class to the
- * {@link org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, 
String, Object) IgniteCompute#submitMapReduce} method to run this
- * task.
+ * {@link 
org.apache.ignite.compute.IgniteCompute#submitMapReduce(TaskDescriptor, 
Object)} method to run this task.
  *
  * @param <I> Split task (I)nput type.
  * @param <M> (M)ap job input type.
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 807901423a..3a93bb98e4 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -663,7 +663,7 @@ public class ClientMessagePacker implements AutoCloseable {
      *
      * @param val Object array.
      */
-    public <T> void packObjectAsBinaryTuple(T val, @Nullable Marshaller<T, 
byte[]> marshaller) {
+    public <T> void packObjectAsBinaryTuple(@Nullable T val, @Nullable 
Marshaller<T, byte[]> marshaller) {
         assert !closed : "Packer is closed";
 
         if (val == null) {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index c79b867fc4..7869f3b3f0 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -56,7 +57,8 @@ public class ClientComputeExecuteMapReduceRequest {
         String taskClassName = in.unpackString();
         Object args = unpackPayload(in);
 
-        TaskExecution<Object> execution = 
compute.submitMapReduce(deploymentUnits, taskClassName, args);
+        TaskExecution<Object> execution = compute.submitMapReduce(
+                
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build(), args);
         sendTaskResult(execution, notificationSender);
 
         var idsAsync = execution.idsAsync()
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index b8c97c8dda..3f12c60052 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client;
 
 import java.util.Collection;
-import java.util.List;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.sql.BatchedArguments;
@@ -140,7 +139,7 @@ public enum ClientOperationType {
     COMPUTE_EXECUTE,
 
     /**
-     * Compute Execute MapReduce ({@link 
org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String, Object)}).
+     * Compute Execute MapReduce ({@link 
org.apache.ignite.compute.IgniteCompute#submitMapReduce}).
      */
     COMPUTE_EXECUTE_MAPREDUCE,
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index c147d0a62e..36d7b49196 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -42,6 +42,7 @@ import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.client.PayloadInputChannel;
@@ -223,26 +224,25 @@ public class ClientCompute implements IgniteCompute {
     }
 
     @Override
-    public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, T args) {
-        Objects.requireNonNull(units);
-        Objects.requireNonNull(taskClassName);
+    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        Objects.requireNonNull(taskDescriptor);
 
-        return new ClientTaskExecution<>(ch, doExecuteMapReduceAsync(units, 
taskClassName, args, null));
+        return new ClientTaskExecution<>(ch, 
doExecuteMapReduceAsync(taskDescriptor.units(), taskDescriptor.taskClassName(), 
arg, null));
     }
 
     @Override
-    public <T, R> R executeMapReduce(List<DeploymentUnit> units, String 
taskClassName, T args) {
-        return sync(executeMapReduceAsync(units, taskClassName, args));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, arg));
     }
 
     private <T> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
             List<DeploymentUnit> units,
             String taskClassName,
-            T args,
+            @Nullable T arg,
             @Nullable Marshaller<Object, byte[]> marshaller) {
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
-                w -> packTask(w.out(), units, taskClassName, args, marshaller),
+                w -> packTask(w.out(), units, taskClassName, arg, marshaller),
                 ClientCompute::unpackSubmitTaskResult,
                 null,
                 null,
@@ -427,11 +427,11 @@ public class ClientCompute implements IgniteCompute {
     private static void packTask(ClientMessagePacker w,
             List<DeploymentUnit> units,
             String taskClassName,
-            Object args,
+            @Nullable Object arg,
             @Nullable Marshaller<Object, byte[]> marshaller) {
         w.packDeploymentUnits(units);
         w.packString(taskClassName);
-        w.packObjectAsBinaryTuple(args, marshaller);
+        w.packObjectAsBinaryTuple(arg, marshaller);
     }
 
     /**
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index b3e8abf937..8421fb9205 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
@@ -257,7 +258,7 @@ public class ClientComputeTest extends 
BaseIgniteAbstractTest {
 
         try (var client = getClient(server1)) {
             Object args = "arg1";
-            String res1 = client.compute().executeMapReduce(List.of(), "job", 
args);
+            String res1 = 
client.compute().executeMapReduce(TaskDescriptor.<Object, 
String>builder("job").build(), args);
             assertEquals("s1", res1);
         }
     }
@@ -267,7 +268,9 @@ public class ClientComputeTest extends 
BaseIgniteAbstractTest {
         initServers(reqId -> false);
 
         try (var client = getClient(server1)) {
-            TaskExecution<Object> task = 
client.compute().submitMapReduce(List.of(), "job", null);
+            IgniteCompute igniteCompute = client.compute();
+            TaskExecution<Object> task = igniteCompute.submitMapReduce(
+                    TaskDescriptor.builder("job").build(), null);
 
             assertThat(task.resultAsync(), willBe("s1"));
 
@@ -286,7 +289,9 @@ public class ClientComputeTest extends 
BaseIgniteAbstractTest {
         try (var client = getClient(server1)) {
             FakeCompute.future = CompletableFuture.failedFuture(new 
RuntimeException("job failed"));
 
-            TaskExecution<Object> execution = 
client.compute().submitMapReduce(List.of(), "job", null);
+            IgniteCompute igniteCompute = client.compute();
+            TaskExecution<Object> execution = igniteCompute.submitMapReduce(
+                    TaskDescriptor.builder("job").build(), null);
 
             assertThat(execution.resultAsync(), 
willThrowFast(IgniteException.class));
             assertThat(execution.stateAsync(), 
willBe(taskStateWithStatus(TaskStatus.FAILED)));
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 4024675d58..9fc69ca097 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -49,6 +49,7 @@ import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
@@ -170,13 +171,13 @@ public class FakeCompute implements IgniteComputeInternal 
{
     }
 
     @Override
-    public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, T args) {
+    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
         return taskExecution(future != null ? future : completedFuture((R) 
nodeName));
     }
 
     @Override
-    public <T, R> R executeMapReduce(List<DeploymentUnit> units, String 
taskClassName, T args) {
-        return sync(executeMapReduceAsync(units, taskClassName, args));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, arg));
     }
 
     private <R> JobExecution<R> completedExecution(R result) {
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 7ef6cb31c4..db1b458737 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -47,9 +47,11 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -61,6 +63,7 @@ import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -423,7 +426,11 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void submitMapReduce() {
         IgniteImpl entryNode = node(0);
 
-        TaskExecution<Integer> taskExecution = 
entryNode.compute().submitMapReduce(units(), mapReduceTaskClassName(), units());
+        IgniteCompute igniteCompute = entryNode.compute();
+        List<DeploymentUnit> units = units();
+        @Nullable List<DeploymentUnit> arg = units();
+        TaskExecution<Integer> taskExecution = igniteCompute.submitMapReduce(
+                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units).build(), arg);
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
@@ -440,7 +447,9 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void executeMapReduceAsync() {
         IgniteImpl entryNode = node(0);
 
-        CompletableFuture<Integer> future = 
entryNode.compute().executeMapReduceAsync(units(), mapReduceTaskClassName(), 
units());
+        CompletableFuture<Integer> future = 
entryNode.compute().executeMapReduceAsync(
+                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
+                units());
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(future, willBe(sumOfNodeNamesLengths));
@@ -450,7 +459,9 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void executeMapReduce() {
         IgniteImpl entryNode = node(0);
 
-        int result = entryNode.compute().executeMapReduce(units(), 
mapReduceTaskClassName(), units());
+        int result = entryNode.compute().executeMapReduce(
+                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
+                units());
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(result, is(sumOfNodeNamesLengths));
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index 5ecaa96c02..c18e4d173a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -36,7 +36,9 @@ import static org.hamcrest.Matchers.nullValue;
 
 import java.time.Instant;
 import java.util.List;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.compute.task.TaskExecution;
@@ -68,9 +70,9 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
         IgniteImpl entryNode = CLUSTER.node(0);
 
         // Given running task.
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(
-                List.of(), InteractiveTasks.GlobalApi.name(), null
-        );
+        IgniteCompute igniteCompute = entryNode.compute();
+        TaskExecution<List<String>> taskExecution = 
igniteCompute.submitMapReduce(
+                TaskDescriptor.<Object, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
         TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
@@ -181,9 +183,9 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
         IgniteImpl entryNode = CLUSTER.node(0);
 
         // Given running task.
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(
-                List.of(), InteractiveTasks.GlobalApi.name(), null
-        );
+        IgniteCompute igniteCompute = entryNode.compute();
+        TaskExecution<List<String>> taskExecution = 
igniteCompute.submitMapReduce(
+                TaskDescriptor.<Object, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
         TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
@@ -240,9 +242,9 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
         // Given running task.
         String arg = cooperativeCancel ? "NO_INTERRUPT" : null;
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(
-                List.of(), InteractiveTasks.GlobalApi.name(), arg
-        );
+        IgniteCompute igniteCompute = entryNode.compute();
+        TaskExecution<List<String>> taskExecution = 
igniteCompute.submitMapReduce(
+                TaskDescriptor.<String, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), arg);
         TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
@@ -273,7 +275,9 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
     }
 
     private static TaskExecution<List<String>> startTask(IgniteImpl entryNode, 
String args) throws InterruptedException {
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name(), args);
+        IgniteCompute igniteCompute = entryNode.compute();
+        TaskExecution<List<String>> taskExecution = 
igniteCompute.submitMapReduce(
+                TaskDescriptor.<String, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), args);
         new TestingJobExecution<>(new 
TaskToJobExecutionWrapper<>(taskExecution)).assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
         return taskExecution;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index 7d50e506d8..ec3197a6ab 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.toMap;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -28,11 +27,12 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper around {@link IgniteCompute} that adds protection against thread 
hijacking by users.
@@ -72,13 +72,13 @@ public class AntiHijackIgniteCompute implements 
IgniteCompute, Wrapper {
     }
 
     @Override
-    public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, T args) {
-        return new AntiHijackTaskExecution<>(compute.submitMapReduce(units, 
taskClassName, args), asyncContinuationExecutor);
+    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        return new 
AntiHijackTaskExecution<>(compute.submitMapReduce(taskDescriptor, arg), 
asyncContinuationExecutor);
     }
 
     @Override
-    public <T, R> R executeMapReduce(List<DeploymentUnit> units, String 
taskClassName, T args) {
-        return compute.executeMapReduce(units, taskClassName, args);
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return compute.executeMapReduce(taskDescriptor, arg);
     }
 
     private <T, R> JobExecution<R> preventThreadHijack(JobExecution<R> 
execution) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9e874cda3d..b552337d21 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
@@ -359,16 +360,16 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     }
 
     @Override
-    public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, T args) {
-        Objects.requireNonNull(units);
-        Objects.requireNonNull(taskClassName);
+    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        Objects.requireNonNull(taskDescriptor);
 
-        return new 
TaskExecutionWrapper<>(computeComponent.executeTask(this::submitJob, units, 
taskClassName, args));
+        return new TaskExecutionWrapper<>(
+                computeComponent.executeTask(this::submitJob, 
taskDescriptor.units(), taskDescriptor.taskClassName(), arg));
     }
 
     @Override
-    public <T, R> R executeMapReduce(List<DeploymentUnit> units, String 
taskClassName, T args) {
-        return sync(executeMapReduceAsync(units, taskClassName, args));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, arg));
     }
 
     private <M, T> JobExecution<T> submitJob(MapReduceJob<M, T> runner) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index 779fd9bbe9..ed005eed42 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
 import 
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJob;
 import org.apache.ignite.internal.runner.app.Jobs.ArgumentStringMarshaller;
@@ -250,8 +251,8 @@ public class ItThinClientComputeMarshallingTest extends 
ItAbstractThinClientTest
         // When run job with custom marshaller for string argument.
         var compute = computeClientOn(node);
         String result = compute.executeMapReduce(
-                List.of(), MapReduce.class.getName(), List.of("Input_0", 
"Input_1")
-        );
+                TaskDescriptor.builder(MapReduce.class).build(),
+                List.of("Input_0", "Input_1"));
 
         // Then both client and server marshaller were called.
         assertEquals("Input"
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 240da82af5..ca71ac8202 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
@@ -719,7 +720,9 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
     @Test
     void testExecuteMapReduce() throws Exception {
-        TaskExecution<String> execution = 
client().compute().submitMapReduce(List.of(), 
MapReduceNodeNameTask.class.getName(), null);
+        IgniteCompute igniteCompute = client().compute();
+        TaskDescriptor<String, String> taskDescriptor = 
TaskDescriptor.builder(MapReduceNodeNameTask.class).build();
+        TaskExecution<String> execution = 
igniteCompute.submitMapReduce(taskDescriptor, null);
 
         List<Matcher<? super String>> nodeNames = sortedNodes().stream()
                 .map(ClusterNode::name)
@@ -736,8 +739,9 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
     @Test
     void testExecuteMapReduceWithArgs() {
-        TaskExecution<String> execution = client().compute()
-                .submitMapReduce(List.of(), MapReduceArgsTask.class.getName(), 
"1:2:3.3");
+        IgniteCompute igniteCompute = client().compute();
+        TaskDescriptor<String, String> taskDescriptor = 
TaskDescriptor.builder(MapReduceArgsTask.class).build();
+        TaskExecution<String> execution = 
igniteCompute.submitMapReduce(taskDescriptor, "1:2:3.3");
 
         assertThat(execution.resultAsync(), willBe(containsString("1_2_3.3")));
         assertThat(execution.stateAsync(), 
willBe(taskStateWithStatus(TaskStatus.COMPLETED)));
@@ -746,9 +750,11 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
     @ParameterizedTest
     @ValueSource(classes = {MapReduceExceptionOnSplitTask.class, 
MapReduceExceptionOnReduceTask.class})
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-22596";)
-    void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
+    <I, M, T> void testExecuteMapReduceExceptionPropagation(Class<? extends 
MapReduceTask<I, M, T, String>> taskClass) {
+        IgniteCompute igniteCompute = client().compute();
+        TaskDescriptor<I, String> taskDescriptor = 
TaskDescriptor.builder(taskClass).build();
         IgniteException cause = getExceptionInJobExecutionAsync(new 
TaskToJobExecutionWrapper<>(
-                client().compute().submitMapReduce(List.of(), 
taskClass.getName(), null))
+                igniteCompute.submitMapReduce(taskDescriptor, null))
         );
 
         assertThat(cause.getMessage(), containsString("Custom job error"));


Reply via email to