This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ca2f3e14cf IGNITE-23487 Support cancellation tokens in IgniteCompute 
(#4776)
ca2f3e14cf is described below

commit ca2f3e14cfd876104850fe5acbf65825ccb2eda6
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Mon Dec 2 14:31:59 2024 +0300

    IGNITE-23487 Support cancellation tokens in IgniteCompute (#4776)
---
 .../org/apache/ignite/compute/IgniteCompute.java   | 123 ++++++++++++++++++++-
 .../ClientComputeExecuteColocatedRequest.java      |   1 +
 .../compute/ClientComputeExecuteRequest.java       |   2 +-
 .../internal/client/compute/ClientCompute.java     |  58 ++++++++--
 .../apache/ignite/client/fakes/FakeCompute.java    |  35 ++++--
 .../ignite/internal/compute/ItComputeBaseTest.java |  90 +++++++++++++++
 .../internal/compute/ItComputeTestStandalone.java  |  42 +++++++
 .../internal/compute/InfiniteMapReduceTask.java    |  56 ++++++++++
 .../internal/compute/AntiHijackIgniteCompute.java  |  42 ++++++-
 .../ignite/internal/compute/ComputeComponent.java  |  35 +++++-
 .../internal/compute/ComputeComponentImpl.java     |  26 ++++-
 .../internal/compute/ComputeJobFailover.java       |  14 ++-
 .../ignite/internal/compute/ExecutionOptions.java  |   1 +
 .../ignite/internal/compute/IgniteComputeImpl.java |  64 ++++++++---
 .../internal/compute/IgniteComputeInternal.java    |   5 +
 .../internal/compute/queue/QueueExecutionImpl.java |  15 ++-
 .../internal/compute/ComputeComponentImplTest.java |  27 +++++
 .../internal/compute/IgniteComputeImplTest.java    |  42 ++++++-
 .../org/apache/ignite/lang/CancelHandleHelper.java |  17 +++
 .../runner/app/client/ItThinClientComputeTest.java | 120 ++++++++++++++++++++
 .../restart/RestartProofIgniteCompute.java         |  20 ++--
 21 files changed, 765 insertions(+), 70 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 32e3cddd57..cdb5da8788 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -70,7 +71,45 @@ public interface IgniteCompute {
             JobDescriptor<T, R> descriptor,
             @Nullable T arg
     ) {
-        return submit(target, descriptor, arg).resultAsync();
+        return executeAsync(target, descriptor, null, arg);
+    }
+
+    /**
+     * Submits a {@link ComputeJob} of the given class for an execution on a 
single node from a set of candidate nodes. A shortcut for
+     * {@code submit(...).resultAsync()}.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param target Execution target.
+     * @param descriptor Job descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param arg Argument of the job.
+     * @return Job result future.
+     */
+    <T, R> CompletableFuture<R> executeAsync(
+            JobTarget target,
+            JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
+    );
+
+    /**
+     * Executes a {@link ComputeJob} of the given class on a single node from 
a set of candidate nodes.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param target Execution target.
+     * @param descriptor Job descriptor.
+     * @param arg Argument of the job.
+     * @return Job result.
+     * @throws ComputeException If there is any problem executing the job.
+     */
+    default <T, R> R execute(
+            JobTarget target,
+            JobDescriptor<T, R> descriptor,
+            @Nullable T arg
+    ) {
+        return execute(target, descriptor, null, arg);
     }
 
     /**
@@ -80,6 +119,7 @@ public interface IgniteCompute {
      * @param <R> Job (R)esult type.
      * @param target Execution target.
      * @param descriptor Job descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Argument of the job.
      * @return Job result.
      * @throws ComputeException If there is any problem executing the job.
@@ -87,6 +127,7 @@ public interface IgniteCompute {
     <T, R> R execute(
             JobTarget target,
             JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken,
             @Nullable T arg
     );
 
@@ -120,9 +161,29 @@ public interface IgniteCompute {
             Set<ClusterNode> nodes,
             JobDescriptor<T, R> descriptor,
             @Nullable T arg
+    ) {
+        return executeBroadcastAsync(nodes, descriptor, null, arg);
+    }
+
+    /**
+     * Executes a {@link ComputeJob} of the given class on all nodes in the 
given node set.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param nodes Nodes to execute the job on.
+     * @param descriptor Job descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param arg Argument of the job.
+     * @return Map from node to job result.
+     */
+    default <T, R> CompletableFuture<Map<ClusterNode, R>> 
executeBroadcastAsync(
+            Set<ClusterNode> nodes,
+            JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
     ) {
         Map<ClusterNode, CompletableFuture<R>> futures = nodes.stream()
-                .collect(toMap(identity(), node -> 
executeAsync(JobTarget.node(node), descriptor, arg)));
+                .collect(toMap(identity(), node -> 
executeAsync(JobTarget.node(node), descriptor, cancellationToken, arg)));
 
         return allOf(futures.values().toArray(CompletableFuture[]::new))
                 .thenApply(ignored -> {
@@ -152,11 +213,32 @@ public interface IgniteCompute {
             Set<ClusterNode> nodes,
             JobDescriptor<T, R> descriptor,
             @Nullable T arg
+    ) {
+        return executeBroadcast(nodes, descriptor, null, arg);
+    }
+
+    /**
+     * Executes a {@link ComputeJob} of the given class on all nodes in the 
given node set.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param nodes Nodes to execute the job on.
+     * @param descriptor Job descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param arg Argument of the job.
+     * @return Map from node to job result.
+     * @throws ComputeException If there is any problem executing the job.
+     */
+    default <T, R> Map<ClusterNode, R> executeBroadcast(
+            Set<ClusterNode> nodes,
+            JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
     ) {
         Map<ClusterNode, R> map = new HashMap<>();
 
         for (ClusterNode node : nodes) {
-            map.put(node, execute(JobTarget.node(node), descriptor, arg));
+            map.put(node, execute(JobTarget.node(node), descriptor, 
cancellationToken, arg));
         }
 
         return map;
@@ -183,7 +265,37 @@ public interface IgniteCompute {
      * @return Task result future.
      */
     default <T, R> CompletableFuture<R> 
executeMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
-        return submitMapReduce(taskDescriptor, arg).resultAsync();
+        return executeMapReduceAsync(taskDescriptor, null, arg);
+    }
+
+    /**
+     * Submits a {@link MapReduceTask} of the given class for an execution. A 
shortcut for {@code submitMapReduce(...).resultAsync()}.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param taskDescriptor Map reduce task descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param arg Task argument.
+     * @return Task result future.
+     */
+    <T, R> CompletableFuture<R> executeMapReduceAsync(
+            TaskDescriptor<T, R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
+    );
+
+    /**
+     * Executes a {@link MapReduceTask} of the given class.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param taskDescriptor Map reduce task descriptor.
+     * @param arg Task argument.
+     * @return Task result.
+     * @throws ComputeException If there is any problem executing the task.
+     */
+    default <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return executeMapReduce(taskDescriptor, null, arg);
     }
 
     /**
@@ -192,10 +304,11 @@ public interface IgniteCompute {
      * @param <T> Job argument (T)ype.
      * @param <R> Job (R)esult type.
      * @param taskDescriptor Map reduce task descriptor.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Task argument.
      * @return Task result.
      * @throws ComputeException If there is any problem executing the task.
      */
-    <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T 
arg);
+    <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable 
CancellationToken cancellationToken, @Nullable T arg);
 
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index f965b9f8b6..f468889bf1 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -69,6 +69,7 @@ public class ClientComputeExecuteColocatedRequest {
                     deploymentUnits,
                     jobClassName,
                     options,
+                    null,
                     args);
 
             var jobExecution = compute.wrapJobExecutionFuture(jobExecutionFut);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index bf3950be79..bf6a621a95 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -68,7 +68,7 @@ public class ClientComputeExecuteRequest {
         Object arg = unpackJobArgumentWithoutMarshaller(in);
 
         JobExecution<Object> execution = compute.executeAsyncWithFailover(
-                candidates, deploymentUnits, jobClassName, options,  arg
+                candidates, deploymentUnits, jobClassName, options, null, arg
         );
         sendResultAndState(execution, notificationSender);
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 9037cabfc8..c35f502d90 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -59,6 +59,8 @@ import 
org.apache.ignite.internal.client.table.PartitionAwarenessProvider;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.ViewUtils;
+import org.apache.ignite.lang.CancelHandleHelper;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
@@ -90,12 +92,24 @@ public class ClientCompute implements IgniteCompute {
         this.tables = tables;
     }
 
-    @Override
-    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, T arg) {
+    private <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, 
R> descriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
         Objects.requireNonNull(target);
         Objects.requireNonNull(descriptor);
 
-        return new ClientJobExecution<>(ch, submit0(target, descriptor, arg), 
descriptor.resultMarshaller(), descriptor.resultClass());
+        ClientJobExecution<R> execution = new ClientJobExecution<>(ch, 
submit0(target, descriptor, arg), descriptor.resultMarshaller(),
+                descriptor.resultClass());
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
execution::cancelAsync, execution.resultAsync());
+        }
+
+        return execution;
+    }
+
+    @Override
+    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, @Nullable T arg) {
+        return submit(target, descriptor, null, arg);
     }
 
     private <T, R> CompletableFuture<SubmitResult> submit0(JobTarget target, 
JobDescriptor<T, R> descriptor, T arg) {
@@ -120,8 +134,15 @@ public class ClientCompute implements IgniteCompute {
     }
 
     @Override
-    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
T args) {
-        return sync(executeAsync(target, descriptor, args));
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submit(target, descriptor, cancellationToken, 
arg).resultAsync();
+    }
+
+    @Override
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
+        return sync(executeAsync(target, descriptor, cancellationToken, arg));
     }
 
     private <T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(
@@ -159,9 +180,9 @@ public class ClientCompute implements IgniteCompute {
                 .thenCompose(Function.identity());
     }
 
-    /** {@inheritDoc} */
     @Override
-    public <T, R> Map<ClusterNode, JobExecution<R>> 
submitBroadcast(Set<ClusterNode> nodes, JobDescriptor<T, R> descriptor, T arg) {
+    public <T, R> Map<ClusterNode, JobExecution<R>> 
submitBroadcast(Set<ClusterNode> nodes, JobDescriptor<T, R> descriptor,
+            @Nullable T arg) {
         Objects.requireNonNull(nodes);
         Objects.requireNonNull(descriptor);
 
@@ -182,20 +203,37 @@ public class ClientCompute implements IgniteCompute {
         return map;
     }
 
+    @Override
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, cancellationToken, 
arg).resultAsync();
+    }
+
     @Override
     public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, null, arg);
+    }
+
+    private <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
         Objects.requireNonNull(taskDescriptor);
 
-        return new ClientTaskExecution<>(ch,
+        ClientTaskExecution<R> clientExecution = new ClientTaskExecution<>(ch,
                 doExecuteMapReduceAsync(taskDescriptor, arg),
                 taskDescriptor.reduceJobResultMarshaller(),
                 taskDescriptor.reduceJobResultClass()
         );
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
clientExecution::cancelAsync, clientExecution.resultAsync());
+        }
+
+        return clientExecution;
     }
 
     @Override
-    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
-        return sync(executeMapReduceAsync(taskDescriptor, arg));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, cancellationToken, 
arg));
     }
 
     private <T, R> CompletableFuture<SubmitTaskResult> 
doExecuteMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 626753249d..91e5c05578 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.compute.TaskStateImpl;
 import org.apache.ignite.internal.compute.loader.JobClassLoader;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
@@ -98,6 +99,7 @@ public class FakeCompute implements IgniteComputeInternal {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             Object args) {
         if (Objects.equals(jobClassName, GET_UNITS)) {
             String unitString = 
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
@@ -137,17 +139,18 @@ public class FakeCompute implements IgniteComputeInternal 
{
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             Object args
     ) {
         return completedFuture(jobExecution(future != null ? future : 
completedFuture((R) nodeName)));
     }
 
-    @Override
-    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, T args) {
+    private <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, 
R> descriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T args) {
         if (target instanceof AnyNodeJobTarget) {
             Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
             return executeAsyncWithFailover(
-                    nodes, descriptor.units(), descriptor.jobClassName(), 
descriptor.options(), args
+                    nodes, descriptor.units(), descriptor.jobClassName(), 
descriptor.options(), cancellationToken, args
             );
         } else if (target instanceof ColocatedJobTarget) {
             return jobExecution(future != null ? future : completedFuture((R) 
nodeName));
@@ -157,8 +160,20 @@ public class FakeCompute implements IgniteComputeInternal {
     }
 
     @Override
-    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
T args) {
-        return sync(executeAsync(target, descriptor, args));
+    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, @Nullable T arg) {
+        return submit(target, descriptor, null, arg);
+    }
+
+    @Override
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submit(target, descriptor, cancellationToken, 
arg).resultAsync();
+    }
+
+    @Override
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable CancellationToken cancellationToken,
+            @Nullable T args) {
+        return sync(executeAsync(target, descriptor, cancellationToken, args));
     }
 
     @Override
@@ -176,8 +191,14 @@ public class FakeCompute implements IgniteComputeInternal {
     }
 
     @Override
-    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
-        return sync(executeMapReduceAsync(taskDescriptor, arg));
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, arg).resultAsync();
+    }
+
+    @Override
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, cancellationToken, 
arg));
     }
 
     private <R> JobExecution<R> completedExecution(R result) {
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 30e4a31922..4c01e22d91 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -60,7 +60,9 @@ import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
@@ -386,6 +388,94 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         assertThat(ex.getCause().getMessage(), containsString("The table does 
not exist [name=\"PUBLIC\".\"bad-table\"]"));
     }
 
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteAsyncWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class).units(units()).build();
+
+        CompletableFuture<Void> execution = entryNode.compute()
+                .executeAsync(JobTarget.node(clusterNode(executeNode)), job, 
cancelHandle.token(), 100L);
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> execution.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class).units(units()).build();
+
+        CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() -> 
entryNode.compute()
+                .execute(JobTarget.node(clusterNode(executeNode)), job, 
cancelHandle.token(), 100L));
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> runFut.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @ParameterizedTest(name = "withLocal: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Set<ClusterNode> executeNodes =
+                local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : 
Set.of(clusterNode(node(1)), clusterNode(node(2)));
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Map<ClusterNode, Void>> executions = 
entryNode.compute().executeBroadcastAsync(
+                executeNodes,
+                
JobDescriptor.builder(SilentSleepJob.class).units(units()).build(), 
cancelHandle.token(), 100L
+        );
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> executions.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteBroadcastWithCancelHandle(boolean local) {
+        Ignite entryNode = node(0);
+        Set<ClusterNode> executeNodes =
+                local ? Set.of(clusterNode(entryNode), clusterNode(node(2))) : 
Set.of(clusterNode(node(1)), clusterNode(node(2)));
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Map<ClusterNode, Void>> runFut = 
IgniteTestUtils.runAsync(() -> entryNode.compute().executeBroadcast(
+                executeNodes,
+                
JobDescriptor.builder(SilentSleepJob.class).units(units()).build(), 
cancelHandle.token(), 100L
+        ));
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> runFut.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    void cancelComputeExecuteMapReduceAsyncWithCancelHandle() {
+        Ignite entryNode = node(0);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Void> execution = entryNode.compute()
+                
.executeMapReduceAsync(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(),
 cancelHandle.token(), null);
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> execution.get(10, 
TimeUnit.SECONDS));
+    }
+
     static void createTestTableWithOneRow() {
         sql("DROP TABLE IF EXISTS test");
         sql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY (k))");
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 5620c49db5..57fc304542 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -45,7 +45,10 @@ import org.apache.ignite.internal.deployunit.NodesToDeploy;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Integration tests for Compute functionality in standalone Ignite node.
@@ -81,6 +84,45 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
         return units;
     }
 
+    @Override
+    @Disabled("Remove after 
https://issues.apache.org/jira/browse/IGNITE-23731";)
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteAsyncWithCancelHandle(boolean local) {
+        super.cancelComputeExecuteAsyncWithCancelHandle(local);
+    }
+
+    @Override
+    @Disabled("Remove after 
https://issues.apache.org/jira/browse/IGNITE-23731";)
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteWithCancelHandle(boolean local) {
+        super.cancelComputeExecuteWithCancelHandle(local);
+    }
+
+    @Override
+    @Disabled("Remove after 
https://issues.apache.org/jira/browse/IGNITE-23731";)
+    @ParameterizedTest(name = "withLocal: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteBroadcastAsyncWithCancelHandle(boolean local) {
+        super.cancelComputeExecuteBroadcastAsyncWithCancelHandle(local);
+    }
+
+    @Override
+    @Disabled("Remove after 
https://issues.apache.org/jira/browse/IGNITE-23731";)
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancelComputeExecuteBroadcastWithCancelHandle(boolean local) {
+        super.cancelComputeExecuteBroadcastWithCancelHandle(local);
+    }
+
+    @Override
+    @Disabled("Remove after 
https://issues.apache.org/jira/browse/IGNITE-23731";)
+    @Test
+    void cancelComputeExecuteMapReduceAsyncWithCancelHandle() {
+        super.cancelComputeExecuteMapReduceAsyncWithCancelHandle();
+    }
+
     @Test
     void executesJobWithNonExistingUnit() {
         Ignite entryNode = node(0);
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/InfiniteMapReduceTask.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/InfiniteMapReduceTask.java
new file mode 100644
index 0000000000..720ef67624
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/InfiniteMapReduceTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/** Infinite time execution map reduce task, useful for task cancellation 
check. */
+public class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void, 
Void> {
+    @Override
+    public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
+        return completedFuture(List.of(
+                MapReduceJob.<Void, Void>builder()
+                        
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
+                        .nodes(taskContext.ignite().clusterNodes())
+                        .build()
+        ));
+    }
+
+    @Override
+    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
+        return completedFuture(null);
+    }
+
+    private static class InfiniteMapReduceJob implements ComputeJob<Void, 
Void> {
+        @Override
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Void input) {
+            return new CompletableFuture<>();
+        }
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index ec3197a6ab..995329093a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
@@ -31,6 +32,7 @@ import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
 import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -50,13 +52,28 @@ public class AntiHijackIgniteCompute implements 
IgniteCompute, Wrapper {
     }
 
     @Override
-    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, T args) {
-        return preventThreadHijack(compute.submit(target, descriptor, args));
+    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor,
+            @Nullable T arg) {
+        return preventThreadHijack(compute.submit(target, descriptor, arg));
+    }
+
+    private <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, 
R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        IgniteComputeImpl compute0 = unwrap(IgniteComputeImpl.class);
+
+        return preventThreadHijack(compute0.submit(target, descriptor, 
cancellationToken, arg));
+    }
+
+    @Override
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submit(target, descriptor, cancellationToken, 
arg).resultAsync();
     }
 
     @Override
-    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
T args) {
-        return compute.execute(target, descriptor, args);
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
+        return compute.execute(target, descriptor, cancellationToken, arg);
     }
 
     @Override
@@ -71,14 +88,27 @@ public class AntiHijackIgniteCompute implements 
IgniteCompute, Wrapper {
                 .collect(toMap(Entry::getKey, entry -> 
preventThreadHijack(entry.getValue())));
     }
 
+    private <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
+        IgniteComputeImpl compute0 = unwrap(IgniteComputeImpl.class);
+
+        return new 
AntiHijackTaskExecution<>(compute0.submitMapReduce(taskDescriptor, 
cancellationToken, arg), asyncContinuationExecutor);
+    }
+
     @Override
     public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
         return new 
AntiHijackTaskExecution<>(compute.submitMapReduce(taskDescriptor, arg), 
asyncContinuationExecutor);
     }
 
     @Override
-    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
-        return compute.executeMapReduce(taskDescriptor, arg);
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, cancellationToken, 
arg).resultAsync();
+    }
+
+    @Override
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return compute.executeMapReduce(taskDescriptor, cancellationToken, 
arg);
     }
 
     private <T, R> JobExecution<R> preventThreadHijack(JobExecution<R> 
execution) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index bdff6c0164..c9942991d1 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -27,6 +27,7 @@ import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.task.JobSubmitter;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,6 +42,7 @@ public interface ComputeComponent extends IgniteComponent {
      * @param options Job execution options.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Job args.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -49,9 +51,30 @@ public interface ComputeComponent extends IgniteComponent {
             ExecutionOptions options,
             List<DeploymentUnit> units,
             String jobClassName,
-            T arg
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
     );
 
+    /**
+     * Executes a job of the given class on the current node.
+     *
+     *
+     * @param options Job execution options.
+     * @param units Deployment units which will be loaded for execution.
+     * @param jobClassName Name of the job class.
+     * @param arg Job args.
+     * @param <R> Job result type.
+     * @return Job execution object.
+     */
+    default <T, R> JobExecution<R> executeLocally(
+            ExecutionOptions options,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            @Nullable T arg
+    ) {
+        return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName, 
null, arg);
+    }
+
     /**
      * Executes a job of the given class on the current node with default 
execution options {@link ExecutionOptions#DEFAULT}.
      *
@@ -66,7 +89,7 @@ public interface ComputeComponent extends IgniteComponent {
             String jobClassName,
             T arg
     ) {
-        return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName, 
arg);
+        return executeLocally(ExecutionOptions.DEFAULT, units, jobClassName, 
null, arg);
     }
 
     /**
@@ -76,6 +99,7 @@ public interface ComputeComponent extends IgniteComponent {
      * @param remoteNode Remote node name.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Job args.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -85,6 +109,7 @@ public interface ComputeComponent extends IgniteComponent {
             ClusterNode remoteNode,
             List<DeploymentUnit> units,
             String jobClassName,
+            @Nullable CancellationToken cancellationToken,
             T arg
     );
 
@@ -104,7 +129,7 @@ public interface ComputeComponent extends IgniteComponent {
             String jobClassName,
             T arg
     ) {
-        return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units, 
jobClassName, arg);
+        return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units, 
jobClassName, null, arg);
     }
 
     /**
@@ -116,6 +141,7 @@ public interface ComputeComponent extends IgniteComponent {
      * @param options Job execution options.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param arg Job args.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -126,7 +152,8 @@ public interface ComputeComponent extends IgniteComponent {
             List<DeploymentUnit> units,
             String jobClassName,
             ExecutionOptions options,
-            T arg
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
     );
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 2435642a63..c68056c66e 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -61,6 +61,8 @@ import 
org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.CancelHandleHelper;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -124,6 +126,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             ExecutionOptions options,
             List<DeploymentUnit> units,
             String jobClassName,
+            @Nullable CancellationToken cancellationToken,
             I arg
     ) {
         if (!busyLock.enterBusy()) {
@@ -133,8 +136,10 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         }
 
         try {
+            CompletableFuture<JobContext> classLoaderFut = 
jobContextManager.acquireClassLoader(units);
+
             CompletableFuture<JobExecutionInternal<R>> future =
-                    
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), 
jobClassName)
+                    mapClassLoaderExceptions(classLoaderFut, jobClassName)
                             .thenApply(context -> {
                                 JobExecutionInternal<R> execution = 
execJob(context, options, jobClassName, arg);
                                 execution.resultAsync().whenComplete((result, 
e) -> context.close());
@@ -143,8 +148,16 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
                             });
 
             inFlightFutures.registerFuture(future);
+            inFlightFutures.registerFuture(classLoaderFut);
 
             JobExecution<R> result = new DelegatingJobExecution<>(future);
+
+            if (cancellationToken != null) {
+                CancelHandleHelper.addCancelAction(cancellationToken, 
classLoaderFut);
+                CancelHandleHelper.addCancelAction(cancellationToken, future);
+                CancelHandleHelper.addCancelAction(cancellationToken, 
result::cancelAsync, result.resultAsync());
+            }
+
             result.idAsync().thenAccept(jobId -> 
executionManager.addExecution(jobId, result));
             return result;
         } finally {
@@ -192,6 +205,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             ClusterNode remoteNode,
             List<DeploymentUnit> units,
             String jobClassName,
+            @Nullable CancellationToken cancellationToken,
             T arg
     ) {
         if (!busyLock.enterBusy()) {
@@ -208,6 +222,11 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             inFlightFutures.registerFuture(resultFuture);
 
             JobExecution<R> result = new RemoteJobExecution<>(remoteNode, 
jobIdFuture, resultFuture, inFlightFutures, messaging);
+
+            if (cancellationToken != null) {
+                CancelHandleHelper.addCancelAction(cancellationToken, 
result::cancelAsync, result.resultAsync());
+            }
+
             jobIdFuture.thenAccept(jobId -> 
executionManager.addExecution(jobId, result));
             return result;
         } finally {
@@ -222,12 +241,13 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             List<DeploymentUnit> units,
             String jobClassName,
             ExecutionOptions options,
-            T arg
+            @Nullable CancellationToken cancellationToken,
+            @Nullable T arg
     ) {
         JobExecution<R> result = (JobExecution<R>) new ComputeJobFailover<>(
                 this, logicalTopologyService, topologyService,
                 remoteNode, nextWorkerSelector, failoverExecutor, units,
-                jobClassName, options, arg
+                jobClassName, options, cancellationToken, arg
         ).failSafeExecute();
 
         result.idAsync().thenAccept(jobId -> 
executionManager.addExecution(jobId, result));
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index da2953a74e..ae531a1ee0 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -30,8 +30,10 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This is a helper class for {@link ComputeComponent} to handle job failures. 
You can think about this class as a "retryable compute job
@@ -84,6 +86,9 @@ class ComputeJobFailover<R> {
      */
     private final RemoteExecutionContext<?, R> jobContext;
 
+    /** Cancellation token. */
+    @Nullable private final CancellationToken cancellationToken;
+
     /**
      * Creates a per-job instance.
      *
@@ -96,6 +101,7 @@ class ComputeJobFailover<R> {
      * @param units deployment units.
      * @param jobClassName the name of the job class.
      * @param executionOptions execution options like priority or max retries.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param args the arguments of the job.
      */
     ComputeJobFailover(
@@ -108,6 +114,7 @@ class ComputeJobFailover<R> {
             List<DeploymentUnit> units,
             String jobClassName,
             ExecutionOptions executionOptions,
+            @Nullable CancellationToken cancellationToken,
             Object args
     ) {
         this.computeComponent = computeComponent;
@@ -117,6 +124,7 @@ class ComputeJobFailover<R> {
         this.nextWorkerSelector = nextWorkerSelector;
         this.jobContext = new RemoteExecutionContext<>(units, jobClassName, 
executionOptions, args);
         this.executor = executor;
+        this.cancellationToken = cancellationToken;
     }
 
     /**
@@ -138,11 +146,13 @@ class ComputeJobFailover<R> {
     private JobExecution<R> launchJobOn(ClusterNode runningWorkerNode) {
         if 
(runningWorkerNode.name().equals(topologyService.localMember().name())) {
             return computeComponent.executeLocally(
-                    jobContext.executionOptions(), jobContext.units(), 
jobContext.jobClassName(), jobContext.arg()
+                    jobContext.executionOptions(), jobContext.units(), 
jobContext.jobClassName(), cancellationToken,
+                    jobContext.arg()
             );
         } else {
             return computeComponent.executeRemotely(
-                    jobContext.executionOptions(), runningWorkerNode, 
jobContext.units(), jobContext.jobClassName(), jobContext.arg()
+                    jobContext.executionOptions(), runningWorkerNode, 
jobContext.units(), jobContext.jobClassName(), cancellationToken,
+                    jobContext.arg()
             );
         }
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
index 47604cd468..af381ec61d 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
@@ -70,6 +70,7 @@ public class ExecutionOptions {
         return Objects.hash(priority, maxRetries);
     }
 
+    /** Compose execution options.  */
     public static ExecutionOptions from(JobExecutionOptions 
jobExecutionOptions) {
         return 
builder().priority(jobExecutionOptions.priority()).maxRetries(jobExecutionOptions.maxRetries()).build();
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 20d083b7b7..626b3df245 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -67,6 +67,8 @@ import 
org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancelHandleHelper;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -108,8 +110,8 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         tables.setStreamerReceiverRunner(this);
     }
 
-    @Override
-    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, T args) {
+    <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T args) {
         Objects.requireNonNull(target);
         Objects.requireNonNull(descriptor);
 
@@ -121,7 +123,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
 
             return new ResultUnmarshallingJobExecution<>(
                     executeAsyncWithFailover(
-                            nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(),
+                            nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(), cancellationToken,
                             tryMarshalOrCast(argumentMarshaller, args)
                     ),
                     resultMarshaller
@@ -143,7 +145,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                                         new 
NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, table, 
key, mapper),
                                         descriptor.units(),
                                         descriptor.jobClassName(),
-                                        descriptor.options(),
+                                        descriptor.options(), 
cancellationToken,
                                         tryMarshalOrCast(argumentMarshaller, 
args)
                                 )));
 
@@ -155,6 +157,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                                 descriptor.units(),
                                 descriptor.jobClassName(),
                                 descriptor.options(),
+                                cancellationToken,
                                 tryMarshalOrCast(argumentMarshaller, args)))
                         .thenApply(job -> (JobExecution<R>) job);
             }
@@ -166,8 +169,20 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     }
 
     @Override
-    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
T args) {
-        return sync(executeAsync(target, descriptor, args));
+    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, @Nullable T arg) {
+        return submit(target, descriptor, null, arg);
+    }
+
+    @Override
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submit(target, descriptor, cancellationToken, 
arg).resultAsync();
+    }
+
+    @Override
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable CancellationToken cancellationToken,
+            @Nullable T args) {
+        return sync(executeAsync(target, descriptor, cancellationToken, args));
     }
 
     @Override
@@ -176,6 +191,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             @Nullable Object arg
     ) {
         Set<ClusterNode> candidates = new HashSet<>();
@@ -201,6 +217,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                         units,
                         jobClassName,
                         options,
+                        cancellationToken,
                         arg
                 ));
     }
@@ -222,16 +239,16 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions jobExecutionOptions,
+            @Nullable CancellationToken cancellationToken,
             @Nullable T arg
     ) {
         ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
 
         if (isLocal(targetNode)) {
-            return computeComponent.executeLocally(options, units, 
jobClassName, arg);
+            return computeComponent.executeLocally(options, units, 
jobClassName, cancellationToken, arg);
         } else {
             return computeComponent.executeRemotelyWithFailover(
-                    targetNode, nextWorkerSelector, units, jobClassName, 
options, arg
-            );
+                    targetNode, nextWorkerSelector, units, jobClassName, 
options, cancellationToken, arg);
         }
     }
 
@@ -263,12 +280,13 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             @Nullable Object arg) {
         return primaryReplicaForPartitionByTupleKey(table, key)
                 .thenApply(primaryNode -> executeOnOneNodeWithFailover(
                         primaryNode,
                         new NextColocatedWorkerSelector<>(placementDriver, 
topologyService, clock, table, key),
-                        units, jobClassName, options, arg
+                        units, jobClassName, options, cancellationToken, arg
                 ));
     }
 
@@ -334,22 +352,39 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                                             executeOnOneNodeWithFailover(
                                                     node, 
CompletableFutures::nullCompletedFuture,
                                                     descriptor.units(), 
descriptor.jobClassName(),
-                                                    descriptor.options(), 
tryMarshalOrCast(argumentMarshaller, args))),
+                                                    descriptor.options(), 
null, tryMarshalOrCast(argumentMarshaller, args))),
                                     resultMarshaller);
                         }));
     }
 
+    @Override
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, cancellationToken, 
arg).resultAsync();
+    }
+
     @Override
     public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        return submitMapReduce(taskDescriptor, null, arg);
+    }
+
+    <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
         Objects.requireNonNull(taskDescriptor);
 
-        return new TaskExecutionWrapper<>(
+        TaskExecutionWrapper<R> execution = new TaskExecutionWrapper<>(
                 computeComponent.executeTask(this::submitJob, 
taskDescriptor.units(), taskDescriptor.taskClassName(), arg));
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
execution::cancelAsync, execution.resultAsync());
+        }
+
+        return execution;
     }
 
     @Override
-    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
-        return sync(executeMapReduceAsync(taskDescriptor, arg));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return sync(executeMapReduceAsync(taskDescriptor, cancellationToken, 
arg));
     }
 
     private <M, T> JobExecution<T> submitJob(MapReduceJob<M, T> runner) {
@@ -397,6 +432,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 deploymentUnits,
                 StreamerReceiverJob.class.getName(),
                 JobExecutionOptions.DEFAULT,
+                null,
                 payload);
 
         return jobExecution.resultAsync()
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 352b4563d4..90835bc4f4 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -29,6 +29,7 @@ import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
@@ -46,6 +47,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param options Job execution options.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param payload Arguments of the job.
      * @return CompletableFuture Job result.
      */
@@ -54,6 +56,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             @Nullable Object payload
     );
 
@@ -66,6 +69,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param options job execution options (priority, max retries).
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param payload Arguments of the job.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -76,6 +80,7 @@ public interface IgniteComputeInternal extends IgniteCompute {
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
+            @Nullable CancellationToken cancellationToken,
             Object payload);
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index ea28dd9e9b..3aaa54f1d8 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -177,11 +177,18 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
                     stateMachine.queueJob(jobId);
                     run();
                 } else {
-                    if (queueEntry.isInterrupted()) {
-                        stateMachine.cancelJob(jobId);
-                    } else {
-                        stateMachine.failJob(jobId);
+                    try {
+                        if (queueEntry.isInterrupted()) {
+                            stateMachine.cancelJob(jobId);
+                        } else {
+                            stateMachine.failJob(jobId);
+                        }
+                    // TODO: Need to be refactored after 
https://issues.apache.org/jira/browse/IGNITE-23769
+                    } catch (IllegalJobStatusTransition err) {
+                        throwable.addSuppressed(err);
+                        result.completeExceptionally(throwable);
                     }
+
                     result.completeExceptionally(throwable);
                 }
             } else {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index dfee8b8862..c9d6718629 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -28,6 +28,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.awaitility.Awaitility.await;
@@ -38,8 +39,10 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -104,6 +107,7 @@ import 
org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
@@ -195,6 +199,29 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThatNoRequestsWereSent();
     }
 
+    @Test
+    void testLongPreExecutionInitialization() {
+        CompletableFuture<?> infiniteFuture = new CompletableFuture<>();
+
+        doReturn(infiniteFuture)
+                .when(jobContextManager).acquireClassLoader(List.of());
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecution<String> execution = 
computeComponent.executeLocally(DEFAULT, List.of(), SimpleJob.class.getName(),
+                cancelHandle.token(), "");
+
+        assertFalse(infiniteFuture.isDone());
+        assertFalse(execution.resultAsync().isDone());
+
+        cancelHandle.cancel();
+
+        assertThat(cancelHandle.cancelAsync(), willSucceedFast());
+
+        assertTrue(infiniteFuture.isCompletedExceptionally());
+        assertTrue(execution.resultAsync().isDone());
+    }
+
     @Test
     void getsStateAndCancelsLocally() {
         JobExecution<String> execution = 
computeComponent.executeLocally(List.of(), LongJob.class.getName(), null);
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 673455e632..debd9da238 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -20,14 +20,17 @@ package org.apache.ignite.internal.compute;
 import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -53,6 +56,8 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.Tuple;
@@ -118,7 +123,27 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest 
{
                 willBe("jobResponse")
         );
 
-        verify(computeComponent).executeLocally(ExecutionOptions.DEFAULT, 
testDeploymentUnits, JOB_CLASS_NAME, "a");
+        verify(computeComponent).executeLocally(ExecutionOptions.DEFAULT, 
testDeploymentUnits, JOB_CLASS_NAME, null, "a");
+    }
+
+    @Test
+    void safeCallCancelHandleAfterJobProcessing() {
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        respondWhenExecutingSimpleJobLocally(ExecutionOptions.DEFAULT, 
cancelHandle.token());
+
+        assertThat(
+                compute.executeAsync(
+                        JobTarget.node(localNode),
+                        
JobDescriptor.builder(JOB_CLASS_NAME).units(testDeploymentUnits).build(),
+                        cancelHandle.token(),
+                        "a"),
+                willBe("jobResponse")
+        );
+
+        assertFalse(cancelHandle.isCancelled());
+        cancelHandle.cancel();
+        assertThat(cancelHandle.cancelAsync(), willSucceedFast());
     }
 
     @Test
@@ -136,7 +161,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("jobResponse")
         );
 
-        verify(computeComponent).executeLocally(ExecutionOptions.DEFAULT, 
testDeploymentUnits, JOB_CLASS_NAME, "a");
+        verify(computeComponent).executeLocally(ExecutionOptions.DEFAULT, 
testDeploymentUnits, JOB_CLASS_NAME, null, "a");
     }
 
     @Test
@@ -168,7 +193,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("jobResponse")
         );
 
-        verify(computeComponent).executeLocally(expectedOptions, 
testDeploymentUnits, JOB_CLASS_NAME, "a");
+        verify(computeComponent).executeLocally(expectedOptions, 
testDeploymentUnits, JOB_CLASS_NAME, null, "a");
     }
 
     @Test
@@ -243,19 +268,24 @@ class IgniteComputeImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), eq("a")))
+        when(computeComponent.executeLocally(executionOptions, 
testDeploymentUnits, JOB_CLASS_NAME, null, "a"))
+                .thenReturn(completedExecution("jobResponse"));
+    }
+
+    private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions, CancellationToken token) {
+        when(computeComponent.executeLocally(executionOptions, 
testDeploymentUnits, JOB_CLASS_NAME, token, "a"))
                 .thenReturn(completedExecution("jobResponse"));
     }
 
     private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions 
options) {
         when(computeComponent.executeRemotelyWithFailover(
-                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), eq("a")
+                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), isNull(), eq("a")
         )).thenReturn(completedExecution("remoteResponse"));
     }
 
     private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) {
         verify(computeComponent).executeRemotelyWithFailover(
-                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), eq("a")
+                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), isNull(), eq("a")
         );
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java 
b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
index 933f0338e8..f10477d1b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
@@ -61,6 +61,23 @@ public final class CancelHandleHelper {
         t.addCancelAction(cancelAction, completionFut);
     }
 
+    /**
+     * Attaches a future to the given token. A cancellation procedure call 
{@link CompletableFuture#cancel} and handle completes
+     * when {@code completionFut} completes.
+     *
+     * @param token Cancellation token.
+     * @param completionFut Future that completes when operation completes and 
all resources it created are released.
+     */
+    public static void addCancelAction(
+            CancellationToken token,
+            CompletableFuture<?> completionFut
+    ) {
+        Objects.requireNonNull(token, "token");
+        Objects.requireNonNull(completionFut, "completionFut");
+
+        addCancelAction(token, () -> completionFut.cancel(true), 
completionFut);
+    }
+
     private static CancellationTokenImpl unwrapToken(CancellationToken token) {
         if (token instanceof CancellationTokenImpl) {
             return (CancellationTokenImpl) token;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index edcdf6db72..fb994373d7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -64,6 +64,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.client.IgniteClient;
@@ -84,6 +87,8 @@ import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.compute.task.TaskExecutionContext;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.TaskToJobExecutionWrapper;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
@@ -132,6 +137,84 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertEquals("itcct_n_3345", res2);
     }
 
+    @Test
+    void computeExecuteAsyncWithCancelHandle() {
+        IgniteClient entryNode = client();
+        ClusterNode executeNode = node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobDescriptor<Object, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+        CompletableFuture<Void> execution = 
entryNode.compute().executeAsync(JobTarget.node(executeNode), job, 
cancelHandle.token(), null);
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> execution.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    void computeExecuteWithCancelHandle() {
+        IgniteClient entryNode = client();
+        ClusterNode executeNode = node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobDescriptor<Object, Void> job = 
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
+        CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() ->  
entryNode.compute()
+                .execute(JobTarget.node(executeNode), job, 
cancelHandle.token(), null));
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> runFut.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    void computeExecuteBroadcastAsyncWithCancelHandle() {
+        IgniteClient entryNode = client();
+        Set<ClusterNode> executeNodes = Set.of(node(0), node(1));
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Map<ClusterNode, Object>> executions = 
entryNode.compute().executeBroadcastAsync(
+                executeNodes,
+                JobDescriptor.builder(InfiniteJob.class.getName()).build(), 
cancelHandle.token(), 100L);
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> executions.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    void computeExecuteBroadcastWithCancelHandle() {
+        IgniteClient entryNode = client();
+        Set<ClusterNode> executeNodes = Set.of(node(0), node(1));
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Map<ClusterNode, Object>> runFut = 
IgniteTestUtils.runAsync(() -> entryNode.compute().executeBroadcast(
+                executeNodes,
+                JobDescriptor.builder(InfiniteJob.class.getName()).build(), 
cancelHandle.token(), 100L)
+        );
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> runFut.get(10, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    void cancelComputeExecuteMapReduceAsyncWithCancelHandle() {
+        IgniteClient entryNode = client();
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Void> execution = entryNode.compute()
+                
.executeMapReduceAsync(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(),
 cancelHandle.token(), null);
+
+        cancelHandle.cancel();
+
+        assertThrows(ExecutionException.class, () -> execution.get(10, 
TimeUnit.SECONDS));
+    }
+
     @Test
     void testExecuteOnSpecificNodeAsync() {
         JobExecution<String> execution1 = client().compute().submit(
@@ -958,6 +1041,18 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
     }
 
+    private static class InfiniteJob implements ComputeJob<Object, Void> {
+        @Override
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Object ignored) {
+            try {
+                new CountDownLatch(1).await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            return null;
+        }
+    }
+
     /**
      * Custom public exception class.
      */
@@ -966,4 +1061,29 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
             super(traceId, code, message, cause);
         }
     }
+
+    private static class InfiniteMapReduceTask implements MapReduceTask<Void, 
Void, Void, Void> {
+        @Override
+        public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
+            return completedFuture(List.of(
+                    MapReduceJob.<Void, Void>builder()
+                            .jobDescriptor(
+                                    
JobDescriptor.builder(InfiniteMapReduceJob.class).build())
+                            .nodes(taskContext.ignite().clusterNodes())
+                            .build()
+            ));
+        }
+
+        @Override
+        public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
+            return completedFuture(null);
+        }
+
+        private static class InfiniteMapReduceJob implements ComputeJob<Void, 
Void> {
+            @Override
+            public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Void input) {
+                return new CompletableFuture<>();
+            }
+        }
+    }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
index c0b115e1c6..a830a15bbf 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
@@ -29,6 +29,7 @@ import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -53,13 +54,15 @@ class RestartProofIgniteCompute implements IgniteCompute, 
Wrapper {
     }
 
     @Override
-    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable T arg) {
-        return attachmentLock.attached(ignite -> 
ignite.compute().execute(target, descriptor, arg));
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable CancellationToken cancellationToken,
+            @Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().execute(target, descriptor, cancellationToken, arg));
     }
 
     @Override
-    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor, @Nullable T arg) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeAsync(target, descriptor, arg));
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeAsync(target, descriptor, cancellationToken, arg));
     }
 
     @Override
@@ -77,13 +80,14 @@ class RestartProofIgniteCompute implements IgniteCompute, 
Wrapper {
     }
 
     @Override
-    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
-        return attachmentLock.attached(ignite -> 
ignite.compute().executeMapReduce(taskDescriptor, arg));
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().executeMapReduce(taskDescriptor, cancellationToken, arg));
     }
 
     @Override
-    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor, @Nullable T arg) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeMapReduceAsync(taskDescriptor, arg));
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor,
+            @Nullable CancellationToken cancellationToken, @Nullable T arg) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeMapReduceAsync(taskDescriptor, cancellationToken, arg));
     }
 
     @Override

Reply via email to