This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6c9c9a26c6 IGNITE-22433 Add TaskExecutionContext.isCancelled (#3930)
6c9c9a26c6 is described below
commit 6c9c9a26c6e177380f5cadec3ca367d946cfdbdf
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Jun 18 15:11:43 2024 +0300
IGNITE-22433 Add TaskExecutionContext.isCancelled (#3930)
---
.../apache/ignite/compute/task/MapReduceTask.java | 7 +--
.../ignite/compute/task/TaskExecutionContext.java | 7 +++
.../ignite/internal/compute/ItMapReduceTest.java | 30 +++++++++---
.../internal/compute/utils/InteractiveTasks.java | 56 ++++++++++++++++-----
.../apache/ignite/internal/compute/MapReduce.java | 2 +-
.../compute/executor/ComputeExecutorImpl.java | 7 ++-
.../compute/executor/JobExecutionInternal.java | 2 +-
.../compute/task/TaskExecutionContextImpl.java} | 34 ++++++++++---
.../compute/task/TaskExecutionInternal.java | 20 ++++++--
.../compute/executor/ComputeExecutorTest.java | 16 +++---
.../compute/task/TaskExecutionContextImplTest.java | 57 ++++++++++++++++++++++
.../runner/app/client/ItThinClientComputeTest.java | 8 +--
12 files changed, 198 insertions(+), 48 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index bab9e338f4..c328e90c32 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -40,11 +40,12 @@ public interface MapReduceTask<R> {
/**
* This is a finishing step in the task execution. This method will be
called with the map from identifiers of compute jobs submitted as
- * a result of the {@link #split(TaskExecutionContext, Object...)} method
call to the results of the execution of the corresponding
- * job. The return value of this method will be returned as a result of
this task.
+ * a result of the {@link #split(TaskExecutionContext, Object...)} method
call to the results of the execution of the corresponding job.
+ * The return value of this method will be returned as a result of this
task.
*
+ * @param taskContext Task execution context.
* @param results Map from compute job ids to their results.
* @return Final task result.
*/
- R reduce(Map<UUID, ?> results);
+ R reduce(TaskExecutionContext taskContext, Map<UUID, ?> results);
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
index 2a45023f91..5199543c87 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
@@ -27,4 +27,11 @@ public interface TaskExecutionContext {
* @return Ignite instance.
*/
Ignite ignite();
+
+ /**
+ * Flag indicating whether the task was cancelled.
+ *
+ * @return {@code true} when the task was cancelled.
+ */
+ boolean isCancelled();
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index fde5d473a7..c8f07d86ae 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -48,6 +48,8 @@ import org.apache.ignite.lang.IgniteException;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
@SuppressWarnings("resource")
class ItMapReduceTest extends ClusterPerClassIntegrationTest {
@@ -127,12 +129,13 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertThat(taskExecution.statusesAsync(),
willThrow(IgniteException.class));
}
- @Test
- void cancelSplit() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void cancelSplit(boolean cooperativeCancel) throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
- TaskExecution<List<String>> taskExecution = startTask(entryNode);
+ TaskExecution<List<String>> taskExecution = startTask(entryNode,
cooperativeCancel ? "NO_INTERRUPT" : "");
// Save status before split.
JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
@@ -145,6 +148,9 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// And statuses list will fail.
assertThat(taskExecution.statusesAsync(),
willThrow(RuntimeException.class));
+
+ // And second cancel will fail.
+ assertThat(taskExecution.cancelAsync(), willBe(false));
}
@Test
@@ -191,6 +197,9 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// And statuses list contains canceled statuses.
assertJobStates(taskExecution, CANCELED);
+
+ // And second cancel will fail.
+ assertThat(taskExecution.cancelAsync(), willBe(false));
}
@Test
@@ -219,12 +228,14 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertJobStates(taskExecution, COMPLETED);
}
- @Test
- void cancelReduce() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void cancelReduce(boolean cooperativeCancel) throws Exception {
IgniteImpl entryNode = CLUSTER.node(0);
// Given running task.
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name());
+ String arg = cooperativeCancel ? "NO_INTERRUPT" : "";
+ TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name(), arg);
TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(taskExecution);
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
@@ -249,10 +260,13 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// And statuses list contains completed statuses.
assertJobStates(taskExecution, COMPLETED);
+
+ // And second cancel will fail.
+ assertThat(taskExecution.cancelAsync(), willBe(false));
}
- private static TaskExecution<List<String>> startTask(IgniteImpl entryNode)
throws InterruptedException {
- TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name());
+ private static TaskExecution<List<String>> startTask(IgniteImpl entryNode,
Object... args) throws InterruptedException {
+ TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name(), args);
new TestingJobExecution<>(taskExecution).assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
return taskExecution;
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index d8827cf734..cded885c9a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -67,9 +67,9 @@ public final class InteractiveTasks {
private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new
AtomicInteger(0);
/**
- * This counter indicated how many {@link
GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This
counter increased
- * each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is
called and decreased when the method is finished (whatever the
- * result is). Checked in {@link #clearState}.
+ * This counter indicates how many {@link
GlobalInteractiveMapReduceTask#reduce(TaskExecutionContext, Map)} methods are
running now.
+ * This counter is increased every time the {@link
GlobalInteractiveMapReduceTask#reduce(TaskExecutionContext, Map)} is called and
+ * decreased when the method is finished (whatever the result is). Checked
in {@link #clearState}.
*/
private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new
AtomicInteger(0);
@@ -119,15 +119,12 @@ public final class InteractiveTasks {
/**
* Ask reduce method to return a concatenation of jobs results.
*/
- REDUCE_RETURN
- }
+ REDUCE_RETURN,
- private static Signal listenSignal() {
- try {
- return GLOBAL_SIGNALS.take();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ /**
+ * Ask the task to check for cancellation flag and finish if it's set.
+ */
+ CHECK_CANCEL
}
/**
@@ -152,11 +149,36 @@ public final class InteractiveTasks {
* Interactive map reduce task that communicates via {@link
#GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
*/
private static class GlobalInteractiveMapReduceTask implements
MapReduceTask<List<String>> {
+ // When listening for signal is interrupted, if this flag is true,
then corresponding method will throw exception,
+ // otherwise it will clean the interrupted status.
+ private boolean throwExceptionOnInterruption = true;
+
+ private static final String NO_INTERRUPT_ARG_NAME = "NO_INTERRUPT";
+
+ private Signal listenSignal() {
+ try {
+ return GLOBAL_SIGNALS.take();
+ } catch (InterruptedException e) {
+ if (throwExceptionOnInterruption) {
+ throw new RuntimeException(e);
+ } else {
+ Thread.currentThread().interrupt();
+ return Signal.CHECK_CANCEL;
+ }
+ }
+ }
+
@Override
public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
offerArgsAsSignals(args);
+ for (Object arg : args) {
+ if (NO_INTERRUPT_ARG_NAME.equals(arg)) {
+ throwExceptionOnInterruption = false;
+ break;
+ }
+ }
try {
while (true) {
@@ -174,6 +196,11 @@ public final class InteractiveTasks {
.nodes(Set.of(node))
.build()
).collect(toList());
+ case CHECK_CANCEL:
+ if (context.isCancelled()) {
+ throw new RuntimeException("Task is
cancelled");
+ }
+ break;
default:
throw new IllegalStateException("Unexpected value:
" + receivedSignal);
}
@@ -184,7 +211,7 @@ public final class InteractiveTasks {
}
@Override
- public List<String> reduce(Map<UUID, ?> results) {
+ public List<String> reduce(TaskExecutionContext context, Map<UUID, ?>
results) {
RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
try {
while (true) {
@@ -199,6 +226,11 @@ public final class InteractiveTasks {
return results.values().stream()
.map(String.class::cast)
.collect(toList());
+ case CHECK_CANCEL:
+ if (context.isCancelled()) {
+ throw new RuntimeException("Task is
cancelled");
+ }
+ break;
default:
throw new IllegalStateException("Unexpected value:
" + receivedSignal);
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
index a48578e35b..78458216ef 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -51,7 +51,7 @@ public class MapReduce implements MapReduceTask<Integer> {
}
@Override
- public Integer reduce(Map<UUID, ?> results) {
+ public Integer reduce(TaskExecutionContext taskContext, Map<UUID, ?>
results) {
return results.values().stream()
.map(String.class::cast)
.map(String::length)
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index acb0c231ab..2656536e6e 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -25,6 +25,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -34,6 +35,7 @@ import
org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionContextImpl;
import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -99,7 +101,10 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
) {
assert executorService != null;
- return new TaskExecutionInternal<>(executorService, jobSubmitter,
taskClass, () -> ignite, args);
+ AtomicBoolean isCancelled = new AtomicBoolean();
+ TaskExecutionContext context = new TaskExecutionContextImpl(ignite,
isCancelled);
+
+ return new TaskExecutionInternal<>(executorService, jobSubmitter,
taskClass, context, isCancelled, args);
}
@Override
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index ce8c026351..c92b090d6f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -37,7 +37,7 @@ public class JobExecutionInternal<R> {
* Constructor.
*
* @param execution Internal execution state.
- * @param isInterrupted Flag which is passed to the execution context so
that the job can check if for cancellation request.
+ * @param isInterrupted Flag which is passed to the execution context so
that the job can check it for cancellation request.
*/
JobExecutionInternal(QueueExecution<R> execution, AtomicBoolean
isInterrupted) {
this.execution = execution;
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
similarity index 52%
copy from
modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
index 2a45023f91..062d30af37 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImpl.java
@@ -15,16 +15,38 @@
* limitations under the License.
*/
-package org.apache.ignite.compute.task;
+package org.apache.ignite.internal.compute.task;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/**
+ * Implementation of {@link TaskExecutionContext}.
+ */
+public class TaskExecutionContextImpl implements TaskExecutionContext {
+ private final Ignite ignite;
+
+ private final AtomicBoolean isCancelled;
-/** Context of the compute task execution. */
-public interface TaskExecutionContext {
/**
- * Ignite API entry point.
+ * Constructor.
*
- * @return Ignite instance.
+ * @param ignite Ignite instance.
+ * @param isCancelled Cancelled flag.
*/
- Ignite ignite();
+ public TaskExecutionContextImpl(Ignite ignite, AtomicBoolean isCancelled) {
+ this.ignite = ignite;
+ this.isCancelled = isCancelled;
+ }
+
+ @Override
+ public Ignite ignite() {
+ return ignite;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return isCancelled.get();
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index c36aae3865..85a85a6785 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -27,6 +27,7 @@ import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
@@ -39,6 +40,7 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
@@ -56,7 +58,8 @@ import org.jetbrains.annotations.Nullable;
/**
* Internal map reduce task execution object. Runs the {@link
MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
* compute job, then submits the resulting list of jobs. Waits for completion
of all compute jobs, then submits the
- * {@link MapReduceTask#reduce(Map)} method as a compute job. The result of
the task is the result of the split method.
+ * {@link MapReduceTask#reduce(TaskExecutionContext, Map)} method as a compute
job. The result of the task is the result of the split
+ * method.
*
* @param <R> Task result type.
*/
@@ -74,6 +77,8 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
private final AtomicReference<JobStatus> reduceFailedStatus = new
AtomicReference<>();
+ private final AtomicBoolean isCancelled;
+
/**
* Construct an execution object and starts executing.
*
@@ -81,6 +86,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
* @param jobSubmitter Compute jobs submitter.
* @param taskClass Map reduce task class.
* @param context Task execution context.
+ * @param isCancelled Flag which is passed to the execution context so
that the task can check it for cancellation request.
* @param args Task arguments.
*/
public TaskExecutionInternal(
@@ -88,8 +94,10 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
JobSubmitter jobSubmitter,
Class<? extends MapReduceTask<R>> taskClass,
TaskExecutionContext context,
+ AtomicBoolean isCancelled,
Object... args
) {
+ this.isCancelled = isCancelled;
LOG.debug("Executing task {}", taskClass.getName());
splitExecution = executorService.submit(
() -> {
@@ -116,7 +124,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
MapReduceTask<R> task =
splitExecution.resultAsync().thenApply(SplitResult::task).join();
return executorService.submit(
- () -> completedFuture(task.reduce(results)),
+ () -> completedFuture(task.reduce(context, results)),
Integer.MAX_VALUE,
0
);
@@ -184,8 +192,14 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ if (!isCancelled.compareAndSet(false, true)) {
+ return falseCompletedFuture();
+ }
+
// If the split job is not complete, this will cancel the executions
future.
- splitExecution.cancel();
+ if (splitExecution.cancel()) {
+ return trueCompletedFuture();
+ }
// This means we didn't submit any jobs yet.
if (executionsFuture.cancel(true)) {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 059c5aee24..05e260e764 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -75,8 +75,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
ExecutionOptions.DEFAULT,
InterruptingJob.class,
- null,
- new Object[]{}
+ null
);
JobStatus executingStatus = await().until(execution::status,
jobStatusWithState(EXECUTING));
assertThat(execution.cancel(), is(true));
@@ -105,8 +104,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
ExecutionOptions.DEFAULT,
CancellingJob.class,
- null,
- new Object[]{}
+ null
);
JobStatus executingStatus = await().until(execution::status,
jobStatusWithState(EXECUTING));
assertThat(execution.cancel(), is(true));
@@ -142,7 +140,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
ExecutionOptions.builder().maxRetries(maxRetries).build(),
RetryJobFail.class,
null,
- new Object[]{runTimes}
+ runTimes
);
await().until(execution::status, jobStatusWithState(FAILED));
@@ -170,7 +168,8 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
ExecutionOptions.builder().maxRetries(maxRetries).build(),
RetryJobSuccess.class,
null,
- new Object[]{runTimes, maxRetries}
+ runTimes,
+ maxRetries
);
await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -202,7 +201,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
ExecutionOptions.builder().maxRetries(maxRetries).build(),
JobSuccess.class,
null,
- new Object[]{runTimes}
+ runTimes
);
await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -226,8 +225,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
JobExecutionInternal<Integer> execution = computeExecutor.executeJob(
ExecutionOptions.DEFAULT,
SimpleJob.class,
- null,
- new Object[]{}
+ null
);
await().until(execution::status, jobStatusWithState(COMPLETED));
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
new file mode 100644
index 0000000000..c5822fbcb6
--- /dev/null
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/task/TaskExecutionContextImplTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class TaskExecutionContextImplTest extends BaseIgniteAbstractTest {
+ @Mock
+ private Ignite ignite;
+
+ @Test
+ void returnsIgnite() {
+ TaskExecutionContext context = new TaskExecutionContextImpl(ignite,
new AtomicBoolean());
+
+ assertThat(context.ignite(), is(sameInstance(ignite)));
+ }
+
+ @Test
+ void returnsCancelledFlag() {
+ AtomicBoolean isCancelled = new AtomicBoolean();
+
+ TaskExecutionContext context = new TaskExecutionContextImpl(ignite,
isCancelled);
+
+ assertThat(context.isCancelled(), is(false));
+
+ isCancelled.set(true);
+
+ assertThat(context.isCancelled(), is(true));
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 4bf18bc6df..9f47e4bda8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -845,7 +845,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
@Override
- public String reduce(Map<UUID, ?> results) {
+ public String reduce(TaskExecutionContext context, Map<UUID, ?>
results) {
return results.values().stream()
.map(String.class::cast)
.collect(Collectors.joining(","));
@@ -865,7 +865,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
@Override
- public String reduce(Map<UUID, ?> results) {
+ public String reduce(TaskExecutionContext context, Map<UUID, ?>
results) {
return results.values().stream()
.map(String.class::cast)
.collect(Collectors.joining(","));
@@ -879,7 +879,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
@Override
- public String reduce(Map<UUID, ?> results) {
+ public String reduce(TaskExecutionContext context, Map<UUID, ?>
results) {
return "expected split exception";
}
}
@@ -898,7 +898,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
@Override
- public String reduce(Map<UUID, ?> results) {
+ public String reduce(TaskExecutionContext context, Map<UUID, ?>
results) {
throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR,
"Custom job error", null);
}
}