This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ff177e75b9 IGNITE-22847 Add TaskDescriptor to Сompute API (#4153)
ff177e75b9 is described below
commit ff177e75b986ef1054d2f746267b083675d0d0f7
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Jul 30 11:33:56 2024 +0300
IGNITE-22847 Add TaskDescriptor to Сompute API (#4153)
Similar to JobDescriptor, add TaskDescriptor for MapReduce method family,
to keep the API consistent.
---
.../org/apache/ignite/compute/IgniteCompute.java | 20 ++--
.../org/apache/ignite/compute/TaskDescriptor.java | 129 +++++++++++++++++++++
.../apache/ignite/compute/task/MapReduceTask.java | 4 +-
.../internal/client/proto/ClientMessagePacker.java | 2 +-
.../ClientComputeExecuteMapReduceRequest.java | 4 +-
.../apache/ignite/client/ClientOperationType.java | 3 +-
.../internal/client/compute/ClientCompute.java | 20 ++--
.../apache/ignite/client/ClientComputeTest.java | 11 +-
.../apache/ignite/client/fakes/FakeCompute.java | 7 +-
.../ignite/internal/compute/ItComputeBaseTest.java | 17 ++-
.../ignite/internal/compute/ItMapReduceTest.java | 24 ++--
.../internal/compute/AntiHijackIgniteCompute.java | 12 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 13 ++-
.../client/ItThinClientComputeMarshallingTest.java | 5 +-
.../runner/app/client/ItThinClientComputeTest.java | 16 ++-
15 files changed, 221 insertions(+), 66 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index f5569d4056..32e3cddd57 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -22,14 +22,12 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -169,25 +167,23 @@ public interface IgniteCompute {
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
- * @param units Deployment units.
- * @param taskClassName Map reduce task class name.
+ * @param taskDescriptor Map reduce task descriptor.
* @param arg Task argument.
* @return Task execution interface.
*/
- <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String
taskClassName, @Nullable T arg);
+ <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg);
/**
* Submits a {@link MapReduceTask} of the given class for an execution. A
shortcut for {@code submitMapReduce(...).resultAsync()}.
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
- * @param units Deployment units.
- * @param taskClassName Map reduce task class name.
+ * @param taskDescriptor Map reduce task descriptor.
* @param arg Task argument.
* @return Task result future.
*/
- default <T, R> CompletableFuture<R>
executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName,
@Nullable T arg) {
- return this.<T, R>submitMapReduce(units, taskClassName,
arg).resultAsync();
+ default <T, R> CompletableFuture<R>
executeMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
+ return submitMapReduce(taskDescriptor, arg).resultAsync();
}
/**
@@ -195,11 +191,11 @@ public interface IgniteCompute {
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
- * @param units Deployment units.
- * @param taskClassName Map reduce task class name.
+ * @param taskDescriptor Map reduce task descriptor.
* @param arg Task argument.
* @return Task result.
* @throws ComputeException If there is any problem executing the task.
*/
- <T, R> R executeMapReduce(List<DeploymentUnit> units, String
taskClassName, @Nullable T arg);
+ <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T
arg);
+
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
new file mode 100644
index 0000000000..511cd0312f
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.deployment.DeploymentUnit;
+
+/**
+ * Compute task descriptor.
+ */
+public class TaskDescriptor<T, R> {
+ private final String taskClassName;
+
+ private final List<DeploymentUnit> units;
+
+ private TaskDescriptor(
+ String taskClassName,
+ List<DeploymentUnit> units
+ ) {
+ this.taskClassName = taskClassName;
+ this.units = units;
+ }
+
+ /**
+ * Task class name.
+ *
+ * @return Task class name.
+ */
+ public String taskClassName() {
+ return taskClassName;
+ }
+
+ /**
+ * Deployment units.
+ *
+ * @return Deployment units.
+ */
+ public List<DeploymentUnit> units() {
+ return units;
+ }
+
+ /**
+ * Create a new builder.
+ *
+ * @return Task descriptor builder.
+ */
+ public static <T, R> Builder<T, R> builder(String taskClassName) {
+ Objects.requireNonNull(taskClassName);
+
+ return new Builder<>(taskClassName);
+ }
+
+ /**
+ * Create a new builder.
+ *
+ * @return Task descriptor builder.
+ */
+ public static <I, M, T, R> Builder<I, R> builder(Class<? extends
MapReduceTask<I, M, T, R>> taskClass) {
+ Objects.requireNonNull(taskClass);
+
+ return new Builder<>(taskClass.getName());
+ }
+
+ /**
+ * Builder.
+ */
+ public static class Builder<T, R> {
+ private final String taskClassName;
+ private List<DeploymentUnit> units;
+
+ private Builder(String taskClassName) {
+ Objects.requireNonNull(taskClassName);
+
+ this.taskClassName = taskClassName;
+ }
+
+ /**
+ * Sets the deployment units.
+ *
+ * @param units Deployment units.
+ * @return This builder.
+ */
+ public Builder<T, R> units(List<DeploymentUnit> units) {
+ this.units = units;
+ return this;
+ }
+
+ /**
+ * Sets the deployment units.
+ *
+ * @param units Deployment units.
+ * @return This builder.
+ */
+ public Builder<T, R> units(DeploymentUnit... units) {
+ this.units = List.of(units);
+ return this;
+ }
+
+
+ /**
+ * Builds the task descriptor.
+ *
+ * @return Task descriptor.
+ */
+ public TaskDescriptor<T, R> build() {
+ return new TaskDescriptor<>(
+ taskClassName,
+ units == null ? List.of() : units
+ );
+ }
+ }
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index 1f379ac030..69fea09a8b 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -21,12 +21,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.TaskDescriptor;
import org.jetbrains.annotations.Nullable;
/**
* A map reduce task interface. Implement this interface and pass a name of
the implemented class to the
- * {@link org.apache.ignite.compute.IgniteCompute#submitMapReduce(List,
String, Object) IgniteCompute#submitMapReduce} method to run this
- * task.
+ * {@link
org.apache.ignite.compute.IgniteCompute#submitMapReduce(TaskDescriptor,
Object)} method to run this task.
*
* @param <I> Split task (I)nput type.
* @param <M> (M)ap job input type.
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 807901423a..3a93bb98e4 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -663,7 +663,7 @@ public class ClientMessagePacker implements AutoCloseable {
*
* @param val Object array.
*/
- public <T> void packObjectAsBinaryTuple(T val, @Nullable Marshaller<T,
byte[]> marshaller) {
+ public <T> void packObjectAsBinaryTuple(@Nullable T val, @Nullable
Marshaller<T, byte[]> marshaller) {
assert !closed : "Packer is closed";
if (val == null) {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index c79b867fc4..7869f3b3f0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -56,7 +57,8 @@ public class ClientComputeExecuteMapReduceRequest {
String taskClassName = in.unpackString();
Object args = unpackPayload(in);
- TaskExecution<Object> execution =
compute.submitMapReduce(deploymentUnits, taskClassName, args);
+ TaskExecution<Object> execution = compute.submitMapReduce(
+
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build(), args);
sendTaskResult(execution, notificationSender);
var idsAsync = execution.idsAsync()
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index b8c97c8dda..3f12c60052 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client;
import java.util.Collection;
-import java.util.List;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.sql.BatchedArguments;
@@ -140,7 +139,7 @@ public enum ClientOperationType {
COMPUTE_EXECUTE,
/**
- * Compute Execute MapReduce ({@link
org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String, Object)}).
+ * Compute Execute MapReduce ({@link
org.apache.ignite.compute.IgniteCompute#submitMapReduce}).
*/
COMPUTE_EXECUTE_MAPREDUCE,
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index c147d0a62e..36d7b49196 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -42,6 +42,7 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.PayloadInputChannel;
@@ -223,26 +224,25 @@ public class ClientCompute implements IgniteCompute {
}
@Override
- public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, T args) {
- Objects.requireNonNull(units);
- Objects.requireNonNull(taskClassName);
+ public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
+ Objects.requireNonNull(taskDescriptor);
- return new ClientTaskExecution<>(ch, doExecuteMapReduceAsync(units,
taskClassName, args, null));
+ return new ClientTaskExecution<>(ch,
doExecuteMapReduceAsync(taskDescriptor.units(), taskDescriptor.taskClassName(),
arg, null));
}
@Override
- public <T, R> R executeMapReduce(List<DeploymentUnit> units, String
taskClassName, T args) {
- return sync(executeMapReduceAsync(units, taskClassName, args));
+ public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor,
@Nullable T arg) {
+ return sync(executeMapReduceAsync(taskDescriptor, arg));
}
private <T> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
List<DeploymentUnit> units,
String taskClassName,
- T args,
+ @Nullable T arg,
@Nullable Marshaller<Object, byte[]> marshaller) {
return ch.serviceAsync(
ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
- w -> packTask(w.out(), units, taskClassName, args, marshaller),
+ w -> packTask(w.out(), units, taskClassName, arg, marshaller),
ClientCompute::unpackSubmitTaskResult,
null,
null,
@@ -427,11 +427,11 @@ public class ClientCompute implements IgniteCompute {
private static void packTask(ClientMessagePacker w,
List<DeploymentUnit> units,
String taskClassName,
- Object args,
+ @Nullable Object arg,
@Nullable Marshaller<Object, byte[]> marshaller) {
w.packDeploymentUnits(units);
w.packString(taskClassName);
- w.packObjectAsBinaryTuple(args, marshaller);
+ w.packObjectAsBinaryTuple(arg, marshaller);
}
/**
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index b3e8abf937..8421fb9205 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
@@ -257,7 +258,7 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
try (var client = getClient(server1)) {
Object args = "arg1";
- String res1 = client.compute().executeMapReduce(List.of(), "job",
args);
+ String res1 =
client.compute().executeMapReduce(TaskDescriptor.<Object,
String>builder("job").build(), args);
assertEquals("s1", res1);
}
}
@@ -267,7 +268,9 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
initServers(reqId -> false);
try (var client = getClient(server1)) {
- TaskExecution<Object> task =
client.compute().submitMapReduce(List.of(), "job", null);
+ IgniteCompute igniteCompute = client.compute();
+ TaskExecution<Object> task = igniteCompute.submitMapReduce(
+ TaskDescriptor.builder("job").build(), null);
assertThat(task.resultAsync(), willBe("s1"));
@@ -286,7 +289,9 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
try (var client = getClient(server1)) {
FakeCompute.future = CompletableFuture.failedFuture(new
RuntimeException("job failed"));
- TaskExecution<Object> execution =
client.compute().submitMapReduce(List.of(), "job", null);
+ IgniteCompute igniteCompute = client.compute();
+ TaskExecution<Object> execution = igniteCompute.submitMapReduce(
+ TaskDescriptor.builder("job").build(), null);
assertThat(execution.resultAsync(),
willThrowFast(IgniteException.class));
assertThat(execution.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.FAILED)));
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 4024675d58..9fc69ca097 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -49,6 +49,7 @@ import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
@@ -170,13 +171,13 @@ public class FakeCompute implements IgniteComputeInternal
{
}
@Override
- public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, T args) {
+ public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
return taskExecution(future != null ? future : completedFuture((R)
nodeName));
}
@Override
- public <T, R> R executeMapReduce(List<DeploymentUnit> units, String
taskClassName, T args) {
- return sync(executeMapReduceAsync(units, taskClassName, args));
+ public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor,
@Nullable T arg) {
+ return sync(executeMapReduceAsync(taskDescriptor, arg));
}
private <R> JobExecution<R> completedExecution(R result) {
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 7ef6cb31c4..db1b458737 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -47,9 +47,11 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -61,6 +63,7 @@ import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -423,7 +426,11 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void submitMapReduce() {
IgniteImpl entryNode = node(0);
- TaskExecution<Integer> taskExecution =
entryNode.compute().submitMapReduce(units(), mapReduceTaskClassName(), units());
+ IgniteCompute igniteCompute = entryNode.compute();
+ List<DeploymentUnit> units = units();
+ @Nullable List<DeploymentUnit> arg = units();
+ TaskExecution<Integer> taskExecution = igniteCompute.submitMapReduce(
+ TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units).build(), arg);
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
@@ -440,7 +447,9 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void executeMapReduceAsync() {
IgniteImpl entryNode = node(0);
- CompletableFuture<Integer> future =
entryNode.compute().executeMapReduceAsync(units(), mapReduceTaskClassName(),
units());
+ CompletableFuture<Integer> future =
entryNode.compute().executeMapReduceAsync(
+ TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
+ units());
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(future, willBe(sumOfNodeNamesLengths));
@@ -450,7 +459,9 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void executeMapReduce() {
IgniteImpl entryNode = node(0);
- int result = entryNode.compute().executeMapReduce(units(),
mapReduceTaskClassName(), units());
+ int result = entryNode.compute().executeMapReduce(
+ TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
+ units());
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(result, is(sumOfNodeNamesLengths));
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index 5ecaa96c02..c18e4d173a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -36,7 +36,9 @@ import static org.hamcrest.Matchers.nullValue;
import java.time.Instant;
import java.util.List;
+import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.TaskExecution;
@@ -68,9 +70,9 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(
- List.of(), InteractiveTasks.GlobalApi.name(), null
- );
+ IgniteCompute igniteCompute = entryNode.compute();
+ TaskExecution<List<String>> taskExecution =
igniteCompute.submitMapReduce(
+ TaskDescriptor.<Object,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
@@ -181,9 +183,9 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(
- List.of(), InteractiveTasks.GlobalApi.name(), null
- );
+ IgniteCompute igniteCompute = entryNode.compute();
+ TaskExecution<List<String>> taskExecution =
igniteCompute.submitMapReduce(
+ TaskDescriptor.<Object,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
@@ -240,9 +242,9 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// Given running task.
String arg = cooperativeCancel ? "NO_INTERRUPT" : null;
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(
- List.of(), InteractiveTasks.GlobalApi.name(), arg
- );
+ IgniteCompute igniteCompute = entryNode.compute();
+ TaskExecution<List<String>> taskExecution =
igniteCompute.submitMapReduce(
+ TaskDescriptor.<String,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), arg);
TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
@@ -273,7 +275,9 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
}
private static TaskExecution<List<String>> startTask(IgniteImpl entryNode,
String args) throws InterruptedException {
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name(), args);
+ IgniteCompute igniteCompute = entryNode.compute();
+ TaskExecution<List<String>> taskExecution =
igniteCompute.submitMapReduce(
+ TaskDescriptor.<String,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), args);
new TestingJobExecution<>(new
TaskToJobExecutionWrapper<>(taskExecution)).assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
return taskExecution;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index 7d50e506d8..ec3197a6ab 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.toMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -28,11 +27,12 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
/**
* Wrapper around {@link IgniteCompute} that adds protection against thread
hijacking by users.
@@ -72,13 +72,13 @@ public class AntiHijackIgniteCompute implements
IgniteCompute, Wrapper {
}
@Override
- public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, T args) {
- return new AntiHijackTaskExecution<>(compute.submitMapReduce(units,
taskClassName, args), asyncContinuationExecutor);
+ public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
+ return new
AntiHijackTaskExecution<>(compute.submitMapReduce(taskDescriptor, arg),
asyncContinuationExecutor);
}
@Override
- public <T, R> R executeMapReduce(List<DeploymentUnit> units, String
taskClassName, T args) {
- return compute.executeMapReduce(units, taskClassName, args);
+ public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor,
@Nullable T arg) {
+ return compute.executeMapReduce(taskDescriptor, arg);
}
private <T, R> JobExecution<R> preventThreadHijack(JobExecution<R>
execution) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9e874cda3d..b552337d21 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
@@ -359,16 +360,16 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
}
@Override
- public <T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, T args) {
- Objects.requireNonNull(units);
- Objects.requireNonNull(taskClassName);
+ public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
+ Objects.requireNonNull(taskDescriptor);
- return new
TaskExecutionWrapper<>(computeComponent.executeTask(this::submitJob, units,
taskClassName, args));
+ return new TaskExecutionWrapper<>(
+ computeComponent.executeTask(this::submitJob,
taskDescriptor.units(), taskDescriptor.taskClassName(), arg));
}
@Override
- public <T, R> R executeMapReduce(List<DeploymentUnit> units, String
taskClassName, T args) {
- return sync(executeMapReduceAsync(units, taskClassName, args));
+ public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor,
@Nullable T arg) {
+ return sync(executeMapReduceAsync(taskDescriptor, arg));
}
private <M, T> JobExecution<T> submitJob(MapReduceJob<M, T> runner) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index 779fd9bbe9..ed005eed42 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.internal.runner.app.Jobs.ArgMarshallingJob;
import
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJob;
import org.apache.ignite.internal.runner.app.Jobs.ArgumentStringMarshaller;
@@ -250,8 +251,8 @@ public class ItThinClientComputeMarshallingTest extends
ItAbstractThinClientTest
// When run job with custom marshaller for string argument.
var compute = computeClientOn(node);
String result = compute.executeMapReduce(
- List.of(), MapReduce.class.getName(), List.of("Input_0",
"Input_1")
- );
+ TaskDescriptor.builder(MapReduce.class).build(),
+ List.of("Input_0", "Input_1"));
// Then both client and server marshaller were called.
assertEquals("Input"
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 240da82af5..ca71ac8202 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
@@ -719,7 +720,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
@Test
void testExecuteMapReduce() throws Exception {
- TaskExecution<String> execution =
client().compute().submitMapReduce(List.of(),
MapReduceNodeNameTask.class.getName(), null);
+ IgniteCompute igniteCompute = client().compute();
+ TaskDescriptor<String, String> taskDescriptor =
TaskDescriptor.builder(MapReduceNodeNameTask.class).build();
+ TaskExecution<String> execution =
igniteCompute.submitMapReduce(taskDescriptor, null);
List<Matcher<? super String>> nodeNames = sortedNodes().stream()
.map(ClusterNode::name)
@@ -736,8 +739,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
@Test
void testExecuteMapReduceWithArgs() {
- TaskExecution<String> execution = client().compute()
- .submitMapReduce(List.of(), MapReduceArgsTask.class.getName(),
"1:2:3.3");
+ IgniteCompute igniteCompute = client().compute();
+ TaskDescriptor<String, String> taskDescriptor =
TaskDescriptor.builder(MapReduceArgsTask.class).build();
+ TaskExecution<String> execution =
igniteCompute.submitMapReduce(taskDescriptor, "1:2:3.3");
assertThat(execution.resultAsync(), willBe(containsString("1_2_3.3")));
assertThat(execution.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.COMPLETED)));
@@ -746,9 +750,11 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
@ParameterizedTest
@ValueSource(classes = {MapReduceExceptionOnSplitTask.class,
MapReduceExceptionOnReduceTask.class})
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22596")
- void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
+ <I, M, T> void testExecuteMapReduceExceptionPropagation(Class<? extends
MapReduceTask<I, M, T, String>> taskClass) {
+ IgniteCompute igniteCompute = client().compute();
+ TaskDescriptor<I, String> taskDescriptor =
TaskDescriptor.builder(taskClass).build();
IgniteException cause = getExceptionInJobExecutionAsync(new
TaskToJobExecutionWrapper<>(
- client().compute().submitMapReduce(List.of(),
taskClass.getName(), null))
+ igniteCompute.submitMapReduce(taskDescriptor, null))
);
assertThat(cause.getMessage(), containsString("Custom job error"));