This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c9c9a26c6 IGNITE-22433 Add TaskExecutionContext.isCancelled (#3930)
6c9c9a26c6 is described below

commit 6c9c9a26c6e177380f5cadec3ca367d946cfdbdf
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Jun 18 15:11:43 2024 +0300

    IGNITE-22433 Add TaskExecutionContext.isCancelled (#3930)
---
 .../apache/ignite/compute/task/MapReduceTask.java  |  7 +--
 .../ignite/compute/task/TaskExecutionContext.java  |  7 +++
 .../ignite/internal/compute/ItMapReduceTest.java   | 30 +++++++++---
 .../internal/compute/utils/InteractiveTasks.java   | 56 ++++++++++++++++-----
 .../apache/ignite/internal/compute/MapReduce.java  |  2 +-
 .../compute/executor/ComputeExecutorImpl.java      |  7 ++-
 .../compute/executor/JobExecutionInternal.java     |  2 +-
 .../compute/task/TaskExecutionContextImpl.java}    | 34 ++++++++++---
 .../compute/task/TaskExecutionInternal.java        | 20 ++++++--
 .../compute/executor/ComputeExecutorTest.java      | 16 +++---
 .../compute/task/TaskExecutionContextImplTest.java | 57 ++++++++++++++++++++++
 .../runner/app/client/ItThinClientComputeTest.java |  8 +--
 12 files changed, 198 insertions(+), 48 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index bab9e338f4..c328e90c32 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -40,11 +40,12 @@ public interface MapReduceTask<R> {
 
     /**
      * This is a finishing step in the task execution. This method will be 
called with the map from identifiers of compute jobs submitted as
-     * a result of the {@link #split(TaskExecutionContext, Object...)} method 
call to the results of the execution of the corresponding
-     * job. The return value of this method will be returned as a result of 
this task.
+     * a result of the {@link #split(TaskExecutionContext, Object...)} method 
call to the results of the execution of the corresponding job.
+     * The return value of this method will be returned as a result of this 
task.
      *
+     * @param taskContext Task execution context.
      * @param results Map from compute job ids to their results.
      * @return Final task result.
      */
-    R reduce(Map<UUID, ?> results);
+    R reduce(TaskExecutionContext taskContext, Map<UUID, ?> results);
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
 
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
index 2a45023f91..5199543c87 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
@@ -27,4 +27,11 @@ public interface TaskExecutionContext {
      * @return Ignite instance.
      */
     Ignite ignite();
+
+    /**
+     * Flag indicating whether the task was cancelled.
+     *
+     * @return {@code true} when the task was cancelled.
+     */
+    boolean isCancelled();
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index fde5d473a7..c8f07d86ae 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -48,6 +48,8 @@ import org.apache.ignite.lang.IgniteException;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 @SuppressWarnings("resource")
 class ItMapReduceTest extends ClusterPerClassIntegrationTest {
@@ -127,12 +129,13 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
         assertThat(taskExecution.statusesAsync(), 
willThrow(IgniteException.class));
     }
 
-    @Test
-    void cancelSplit() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void cancelSplit(boolean cooperativeCancel) throws Exception {
         IgniteImpl entryNode = CLUSTER.node(0);
 
         // Given running task.
-        TaskExecution<List<String>> taskExecution = startTask(entryNode);
+        TaskExecution<List<String>> taskExecution = startTask(entryNode, 
cooperativeCancel ? "NO_INTERRUPT" : "");
 
         // Save status before split.
         JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
@@ -145,6 +148,9 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
         // And statuses list will fail.
         assertThat(taskExecution.statusesAsync(), 
willThrow(RuntimeException.class));
+
+        // And second cancel will fail.
+        assertThat(taskExecution.cancelAsync(), willBe(false));
     }
 
     @Test
@@ -191,6 +197,9 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
         // And statuses list contains canceled statuses.
         assertJobStates(taskExecution, CANCELED);
+
+        // And second cancel will fail.
+        assertThat(taskExecution.cancelAsync(), willBe(false));
     }
 
     @Test
@@ -219,12 +228,14 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
         assertJobStates(taskExecution, COMPLETED);
     }
 
-    @Test
-    void cancelReduce() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void cancelReduce(boolean cooperativeCancel) throws Exception {
         IgniteImpl entryNode = CLUSTER.node(0);
 
         // Given running task.
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name());
+        String arg = cooperativeCancel ? "NO_INTERRUPT" : "";
+        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name(), arg);
         TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(taskExecution);
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
@@ -249,10 +260,13 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
         // And statuses list contains completed statuses.
         assertJobStates(taskExecution, COMPLETED);
+
+        // And second cancel will fail.
+        assertThat(taskExecution.cancelAsync(), willBe(false));
     }
 
-    private static TaskExecution<List<String>> startTask(IgniteImpl entryNode) 
throws InterruptedException {
-        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name());
+    private static TaskExecution<List<String>> startTask(IgniteImpl entryNode, 
Object... args) throws InterruptedException {
+        TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name(), args);
         new TestingJobExecution<>(taskExecution).assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
         return taskExecution;
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index d8827cf734..cded885c9a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -67,9 +67,9 @@ public final class InteractiveTasks {
     private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new 
AtomicInteger(0);
 
     /**
-     * This counter indicated how many {@link 
GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This 
counter increased
-     * each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is 
called and decreased when the method is finished (whatever the
-     * result is). Checked in {@link #clearState}.
+     * This counter indicates how many {@link 
GlobalInteractiveMapReduceTask#reduce(TaskExecutionContext, Map)} methods are 
running now.
+     * This counter is increased every time the {@link 
GlobalInteractiveMapReduceTask#reduce(TaskExecutionContext, Map)} is called and
+     * decreased when the method is finished (whatever the result is). Checked 
in {@link #clearState}.
      */
     private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new 
AtomicInteger(0);
 
@@ -119,15 +119,12 @@ public final class InteractiveTasks {
         /**
          * Ask reduce method to return a concatenation of jobs results.
          */
-        REDUCE_RETURN
-    }
+        REDUCE_RETURN,
 
-    private static Signal listenSignal() {
-        try {
-            return GLOBAL_SIGNALS.take();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+        /**
+         * Ask the task to check for cancellation flag and finish if it's set.
+         */
+        CHECK_CANCEL
     }
 
     /**
@@ -152,11 +149,36 @@ public final class InteractiveTasks {
      * Interactive map reduce task that communicates via {@link 
#GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
      */
     private static class GlobalInteractiveMapReduceTask implements 
MapReduceTask<List<String>> {
+        // When listening for signal is interrupted, if this flag is true, 
then corresponding method will throw exception,
+        // otherwise it will clean the interrupted status.
+        private boolean throwExceptionOnInterruption = true;
+
+        private static final String NO_INTERRUPT_ARG_NAME = "NO_INTERRUPT";
+
+        private Signal listenSignal() {
+            try {
+                return GLOBAL_SIGNALS.take();
+            } catch (InterruptedException e) {
+                if (throwExceptionOnInterruption) {
+                    throw new RuntimeException(e);
+                } else {
+                    Thread.currentThread().interrupt();
+                    return Signal.CHECK_CANCEL;
+                }
+            }
+        }
+
         @Override
         public List<MapReduceJob> split(TaskExecutionContext context, 
Object... args) {
             RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
 
             offerArgsAsSignals(args);
+            for (Object arg : args) {
+                if (NO_INTERRUPT_ARG_NAME.equals(arg)) {
+                    throwExceptionOnInterruption = false;
+                    break;
+                }
+            }
 
             try {
                 while (true) {
@@ -174,6 +196,11 @@ public final class InteractiveTasks {
                                             .nodes(Set.of(node))
                                             .build()
                             ).collect(toList());
+                        case CHECK_CANCEL:
+                            if (context.isCancelled()) {
+                                throw new RuntimeException("Task is 
cancelled");
+                            }
+                            break;
                         default:
                             throw new IllegalStateException("Unexpected value: 
" + receivedSignal);
                     }
@@ -184,7 +211,7 @@ public final class InteractiveTasks {
         }
 
         @Override
-        public List<String> reduce(Map<UUID, ?> results) {
+        public List<String> reduce(TaskExecutionContext context, Map<UUID, ?> 
results) {
             RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
             try {
                 while (true) {
@@ -199,6 +226,11 @@ public final class InteractiveTasks {
                             return results.values().stream()
                                     .map(String.class::cast)
                                     .collect(toList());
+                        case CHECK_CANCEL:
+                            if (context.isCancelled()) {
+                                throw new RuntimeException("Task is 
cancelled");
+                            }
+                            break;
                         default:
                             throw new IllegalStateException("Unexpected value: 
" + receivedSignal);
                     }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
index a48578e35b..78458216ef 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -51,7 +51,7 @@ public class MapReduce implements MapReduceTask<Integer> {
     }
 
     @Override
-    public Integer reduce(Map<UUID, ?> results) {
+    public Integer reduce(TaskExecutionContext taskContext, Map<UUID, ?> 
results) {
         return results.values().stream()
                 .map(String.class::cast)
                 .map(String::length)
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index acb0c231ab..2656536e6e 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -25,6 +25,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
 import org.apache.ignite.internal.compute.ComputeUtils;
 import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -34,6 +35,7 @@ import 
org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
 import org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionContextImpl;
 import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -99,7 +101,10 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
     ) {
         assert executorService != null;
 
-        return new TaskExecutionInternal<>(executorService, jobSubmitter, 
taskClass, () -> ignite, args);
+        AtomicBoolean isCancelled = new AtomicBoolean();
+        TaskExecutionContext context = new TaskExecutionContextImpl(ignite, 
isCancelled);
+
+        return new TaskExecutionInternal<>(executorService, jobSubmitter, 
taskClass, context, isCancelled, args);
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index ce8c026351..c92b090d6f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -37,7 +37,7 @@ public class JobExecutionInternal<R> {
      * Constructor.
      *
      * @param execution Internal execution state.
-     * @param isInterrupted Flag which is passed to the execution context so 
that the job can check if for cancellation request.
+     * @param isInterrupted Flag which is passed to the execution context so 
that the job can check it for cancellation request.
      */
     JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean 
isInterrupted) {
         this.execution = execution;
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
similarity index 52%
copy from 
modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
index 2a45023f91..062d30af37 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
@@ -15,16 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute.task;
+package org.apache.ignite.internal.compute.task;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/**
+ * Implementation of {@link TaskExecutionContext}.
+ */
+public class TaskExecutionContextImpl implements TaskExecutionContext {
+    private final Ignite ignite;
+
+    private final AtomicBoolean isCancelled;
 
-/** Context of the compute task execution. */
-public interface TaskExecutionContext {
     /**
-     * Ignite API entry point.
+     * Constructor.
      *
-     * @return Ignite instance.
+     * @param ignite Ignite instance.
+     * @param isCancelled Cancelled flag.
      */
-    Ignite ignite();
+    public TaskExecutionContextImpl(Ignite ignite, AtomicBoolean isCancelled) {
+        this.ignite = ignite;
+        this.isCancelled = isCancelled;
+    }
+
+    @Override
+    public Ignite ignite() {
+        return ignite;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return isCancelled.get();
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index c36aae3865..85a85a6785 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -27,6 +27,7 @@ import static org.apache.ignite.compute.JobState.FAILED;
 import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
 import static org.apache.ignite.internal.util.ArrayUtils.concat;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 
@@ -39,6 +40,7 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
@@ -56,7 +58,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Internal map reduce task execution object. Runs the {@link 
MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
  * compute job, then submits the resulting list of jobs. Waits for completion 
of all compute jobs, then submits the
- * {@link MapReduceTask#reduce(Map)} method as a compute job. The result of 
the task is the result of the split method.
+ * {@link MapReduceTask#reduce(TaskExecutionContext, Map)} method as a compute 
job. The result of the task is the result of the split
+ * method.
  *
  * @param <R> Task result type.
  */
@@ -74,6 +77,8 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
 
     private final AtomicReference<JobStatus> reduceFailedStatus = new 
AtomicReference<>();
 
+    private final AtomicBoolean isCancelled;
+
     /**
      * Construct an execution object and starts executing.
      *
@@ -81,6 +86,7 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
      * @param jobSubmitter Compute jobs submitter.
      * @param taskClass Map reduce task class.
      * @param context Task execution context.
+     * @param isCancelled Flag which is passed to the execution context so 
that the task can check it for cancellation request.
      * @param args Task arguments.
      */
     public TaskExecutionInternal(
@@ -88,8 +94,10 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
             JobSubmitter jobSubmitter,
             Class<? extends MapReduceTask<R>> taskClass,
             TaskExecutionContext context,
+            AtomicBoolean isCancelled,
             Object... args
     ) {
+        this.isCancelled = isCancelled;
         LOG.debug("Executing task {}", taskClass.getName());
         splitExecution = executorService.submit(
                 () -> {
@@ -116,7 +124,7 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
             MapReduceTask<R> task = 
splitExecution.resultAsync().thenApply(SplitResult::task).join();
 
             return executorService.submit(
-                    () -> completedFuture(task.reduce(results)),
+                    () -> completedFuture(task.reduce(context, results)),
                     Integer.MAX_VALUE,
                     0
             );
@@ -184,8 +192,14 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
 
     @Override
     public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        if (!isCancelled.compareAndSet(false, true)) {
+            return falseCompletedFuture();
+        }
+
         // If the split job is not complete, this will cancel the executions 
future.
-        splitExecution.cancel();
+        if (splitExecution.cancel()) {
+            return trueCompletedFuture();
+        }
 
         // This means we didn't submit any jobs yet.
         if (executionsFuture.cancel(true)) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 059c5aee24..05e260e764 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -75,8 +75,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
                 ExecutionOptions.DEFAULT,
                 InterruptingJob.class,
-                null,
-                new Object[]{}
+                null
         );
         JobStatus executingStatus = await().until(execution::status, 
jobStatusWithState(EXECUTING));
         assertThat(execution.cancel(), is(true));
@@ -105,8 +104,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
                 ExecutionOptions.DEFAULT,
                 CancellingJob.class,
-                null,
-                new Object[]{}
+                null
         );
         JobStatus executingStatus = await().until(execution::status, 
jobStatusWithState(EXECUTING));
         assertThat(execution.cancel(), is(true));
@@ -142,7 +140,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
                 RetryJobFail.class,
                 null,
-                new Object[]{runTimes}
+                runTimes
         );
 
         await().until(execution::status, jobStatusWithState(FAILED));
@@ -170,7 +168,8 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
                 RetryJobSuccess.class,
                 null,
-                new Object[]{runTimes, maxRetries}
+                runTimes,
+                maxRetries
         );
 
         await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -202,7 +201,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
                 JobSuccess.class,
                 null,
-                new Object[]{runTimes}
+                runTimes
         );
 
         await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -226,8 +225,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
                 ExecutionOptions.DEFAULT,
                 SimpleJob.class,
-                null,
-                new Object[]{}
+                null
         );
 
         await().until(execution::status, jobStatusWithState(COMPLETED));
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
new file mode 100644
index 0000000000..c5822fbcb6
--- /dev/null
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class TaskExecutionContextImplTest extends BaseIgniteAbstractTest {
+    @Mock
+    private Ignite ignite;
+
+    @Test
+    void returnsIgnite() {
+        TaskExecutionContext context = new TaskExecutionContextImpl(ignite, 
new AtomicBoolean());
+
+        assertThat(context.ignite(), is(sameInstance(ignite)));
+    }
+
+    @Test
+    void returnsCancelledFlag() {
+        AtomicBoolean isCancelled = new AtomicBoolean();
+
+        TaskExecutionContext context = new TaskExecutionContextImpl(ignite, 
isCancelled);
+
+        assertThat(context.isCancelled(), is(false));
+
+        isCancelled.set(true);
+
+        assertThat(context.isCancelled(), is(true));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 4bf18bc6df..9f47e4bda8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -845,7 +845,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
 
         @Override
-        public String reduce(Map<UUID, ?> results) {
+        public String reduce(TaskExecutionContext context, Map<UUID, ?> 
results) {
             return results.values().stream()
                     .map(String.class::cast)
                     .collect(Collectors.joining(","));
@@ -865,7 +865,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
 
         @Override
-        public String reduce(Map<UUID, ?> results) {
+        public String reduce(TaskExecutionContext context, Map<UUID, ?> 
results) {
             return results.values().stream()
                     .map(String.class::cast)
                     .collect(Collectors.joining(","));
@@ -879,7 +879,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
 
         @Override
-        public String reduce(Map<UUID, ?> results) {
+        public String reduce(TaskExecutionContext context, Map<UUID, ?> 
results) {
             return "expected split exception";
         }
     }
@@ -898,7 +898,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
 
         @Override
-        public String reduce(Map<UUID, ?> results) {
+        public String reduce(TaskExecutionContext context, Map<UUID, ?> 
results) {
             throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, 
"Custom job error", null);
         }
     }

Reply via email to