This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ce15f41b414 IGNITE-28344 Don't cancel job future when canceling the
job (#7861)
ce15f41b414 is described below
commit ce15f41b414f511bc47d05fdb700492bfe6cc609
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Apr 2 18:29:05 2026 +0300
IGNITE-28344 Don't cancel job future when canceling the job (#7861)
---
.../internal/client/ItThinClientComputeTest.java | 21 ++-
.../ignite/internal/compute/ItComputeBaseTest.java | 32 +++-
.../compute/ItFailoverCandidateNotFoundTest.java | 34 ++--
.../internal/compute/utils/InteractiveJobs.java | 5 +-
.../internal/compute/utils/InteractiveTasks.java | 7 +-
.../jobs/embedded/AsyncDelayedCompleteJob.java | 57 ++++++
.../jobs/standalone/AsyncDelayedCompleteJob.java | 57 ++++++
.../ignite/internal/compute/queue/QueueEntry.java | 26 +--
.../internal/compute/queue/QueueExecutionImpl.java | 10 +-
.../compute/task/TaskExecutionInternal.java | 21 +--
.../compute/executor/ComputeExecutorTest.java | 208 +++++++++++++++++----
.../compute/queue/PriorityQueueExecutorTest.java | 6 +-
.../cpp/tests/client-test/compute_test.cpp | 3 +-
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 19 +-
14 files changed, 394 insertions(+), 112 deletions(-)
diff --git
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
index 823e46366c3..0677b1f18c5 100644
---
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
+++
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.COMPLETED;
import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.compute.JobStatus.FAILED;
@@ -85,6 +84,7 @@ import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.TaskStatus;
@@ -295,8 +295,11 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
- await().until(execution1::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
- await().until(execution2::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ // Async job completes normally after cooperative cancellation
(returns from isCancelled() check) — COMPLETED.
+ // Sync job throws RuntimeException on thread interruption — FAILED
(not CancellationException).
+ JobStatus expectedStatus = asyncJob ? COMPLETED : FAILED;
+ await().until(execution1::stateAsync,
willBe(jobStateWithStatus(expectedStatus)));
+ await().until(execution2::stateAsync,
willBe(jobStateWithStatus(expectedStatus)));
}
@Test
@@ -329,7 +332,8 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
// Cancel task 1, task 3 should start executing
assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
- await().until(execution1::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ // SleepJob throws RuntimeException, not CancellationException.
+ await().until(execution1::stateAsync,
willBe(jobStateWithStatus(FAILED)));
await().until(execution3::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
// Task 2 is still queued
@@ -408,9 +412,10 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
cancelHandle.cancel();
+ // SleepJob throws RuntimeException on interrupt, not
CancellationException.
await().until(() -> executions, contains(
- jobExecutionWithStatus(CANCELED),
- jobExecutionWithStatus(CANCELED)
+ jobExecutionWithStatus(FAILED),
+ jobExecutionWithStatus(FAILED)
));
assertThat(broadcastExecution.resultsAsync(),
willThrow(ComputeException.class));
@@ -725,7 +730,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
- await().until(tupleExecution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ await().until(tupleExecution::stateAsync,
willBe(jobStateWithStatus(FAILED)));
}
@ParameterizedTest
@@ -747,7 +752,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
- await().until(pojoExecution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ await().until(pojoExecution::stateAsync,
willBe(jobStateWithStatus(FAILED)));
}
@Test
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 63147f204cf..6b64fe363dc 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -679,7 +679,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
// RuntimeException is thrown when SleepJob catches the
InterruptedException
assertThat(runtimeException.toString(),
containsString(InterruptedException.class.getName()));
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(FAILED)));
}
@ParameterizedTest
@@ -729,7 +729,31 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
// Cancel running task
assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
- await().until(execution1::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ await().until(execution1::stateAsync,
willBe(jobStateWithStatus(FAILED)));
+ }
+
+ @ParameterizedTest(name = "local: {0}")
+ @ValueSource(booleans = {true, false})
+ void asyncJobCompletesNormallyAfterCooperativeCancellation(boolean local) {
+ Ignite executeNode = local ? node(0) : node(1);
+
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ JobExecution<String> execution = submit(
+ JobTarget.node(clusterNode(executeNode)),
+ asyncDelayedCompleteJob(),
+ cancelHandle.token(),
+ null
+ );
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ cancelHandle.cancel();
+
+ // The async job detects cancellation via isCancelled(), does cleanup,
then completes with a result.
+ // Cooperative cancellation should honor the result — status must be
COMPLETED, not CANCELED.
+ assertThat(execution.resultAsync(),
willBe(is("completed-after-cancel")));
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
}
@ParameterizedTest
@@ -963,6 +987,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return TaskDescriptor.<Void,
Void>builder(jobClassName("InfiniteMapReduceTask")).units(units()).build();
}
+ private JobDescriptor<Void, String> asyncDelayedCompleteJob() {
+ return JobDescriptor.<Void,
String>builder(jobClassName("AsyncDelayedCompleteJob")).units(units()).build();
+ }
+
private JobDescriptor<Tuple, Integer> tupleJob() {
return JobDescriptor.<Tuple,
Integer>builder(jobClassName("TupleJob")).units(units()).build();
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
index 83ec8c62612..bf3d50c75cb 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.compute;
import static
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELED;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
@@ -134,14 +135,17 @@ abstract class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegration
String jobClassName = InteractiveJobs.globalJob().name();
- // When node is shut down gracefully, the job execution is interrupted
and event could be logged anyway
- // So there would be 2 events from a worker node, 1 failed events from
a worker node and 1 failed event from the coordinator
- await().until(logInspector::events, contains(
+ // When node is shut down gracefully, the job execution is interrupted
and CancellationException is thrown.
+ // There would be 2 events from a worker node (QUEUED, EXECUTING),
then a CANCELED event from the worker node
+ // and a FAILED event from the coordinator. The order of the last two
events is not determined.
+ await().until(logInspector::events, hasSize(4));
+
+ assertThat(logInspector.events(), containsInRelativeOrder(
jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
workerNodeName),
- jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
workerNodeName),
- jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName),
- jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName)
+ jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
workerNodeName)
));
+ assertThat(logInspector.events(),
hasItem(jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, workerNodeName)));
+ assertThat(logInspector.events(), hasItem(jobEvent(COMPUTE_JOB_FAILED,
jobId, jobClassName, workerNodeName)));
}
@Test
@@ -177,9 +181,9 @@ abstract class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegration
// Then the job is failed, because there are no more failover workers.
execution.assertFailed();
- // When node is shut down gracefully, the job execution is interrupted
and event could be logged anyway
- // So there would be 4 events from worker nodes, 2 failed events from
both worker nodes and 1 failed event from the coordinator
- // The order of failed events is not determined
+ // Each worker node logs QUEUED, EXECUTING, CANCELED (interrupted on
shutdown → CancellationException).
+ // The coordinator logs FAILED (no more failover candidates).
+ // The order of the last 3 terminal events is not determined.
await().until(logInspector::events, hasSize(7));
String jobClassName = InteractiveJobs.globalJob().name();
@@ -189,12 +193,18 @@ abstract class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegration
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
workerNodeName),
jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
failoverWorker),
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
failoverWorker),
- jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
failoverWorker)
+ // Interrupted on shutdown → CancellationException → CANCELED
+ jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName,
failoverWorker)
));
- // Failed event from second worker node
+ // First worker was also interrupted on shutdown → CANCELED
assertThat(logInspector.events(), hasItem(
- jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName)
+ jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName,
workerNodeName)
+ ));
+
+ // Coordinator's failover gives up (no more candidates) → FAILED,
targetNode is the last worker attempted
+ assertThat(logInspector.events(), hasItem(
+ jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
failoverWorker)
));
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 71522768cba..c122320aba3 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.notNullValue;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -172,7 +173,7 @@ public final class InteractiveJobs {
try {
return GLOBAL_SIGNALS.take();
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new CancellationException();
}
}
@@ -233,7 +234,7 @@ public final class InteractiveJobs {
try {
return channel.take();
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new CancellationException();
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index 8572f156251..f12ff230a80 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -166,7 +167,7 @@ public final class InteractiveTasks {
return GLOBAL_SIGNALS.take();
} catch (InterruptedException e) {
if (throwExceptionOnInterruption) {
- throw new RuntimeException(e);
+ throw new CancellationException();
} else {
Thread.currentThread().interrupt();
return Signal.CHECK_CANCEL;
@@ -201,7 +202,7 @@ public final class InteractiveTasks {
).collect(toList()));
case CHECK_CANCEL:
if (context.isCancelled()) {
- throw new RuntimeException("Task is
cancelled");
+ throw new CancellationException("Task is
cancelled");
}
break;
default:
@@ -229,7 +230,7 @@ public final class InteractiveTasks {
return completedFuture(new
ArrayList<>(results.values()));
case CHECK_CANCEL:
if (context.isCancelled()) {
- throw new RuntimeException("Task is
cancelled");
+ throw new CancellationException("Task is
cancelled");
}
break;
default:
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
new file mode 100644
index 00000000000..0259e960794
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Async compute job that returns a future immediately. A background thread
polls
+ * {@code isCancelled()}, then completes the future with a result after a
brief delay.
+ * This tests that cooperative cancellation of async jobs is honored.
+ */
+public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
+ @Override
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Void arg) {
+ CompletableFuture<String> result = new CompletableFuture<>();
+
+ Thread thread = new Thread(() -> {
+ while (!context.isCancelled()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ // Simulate cleanup after cancellation
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ // ignored
+ }
+
+ result.complete("completed-after-cancel");
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ return result;
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
new file mode 100644
index 00000000000..15f828a4d27
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Async compute job that returns a future immediately. A background thread
polls
+ * {@code isCancelled()}, then completes the future with a result after a
brief delay.
+ * This tests that cooperative cancellation of async jobs is honored.
+ */
+public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
+ @Override
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Void arg) {
+ CompletableFuture<String> result = new CompletableFuture<>();
+
+ Thread thread = new Thread(() -> {
+ while (!context.isCancelled()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ // Simulate cleanup after cancellation
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ // ignored
+ }
+
+ result.complete("completed-after-cancel");
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ return result;
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index c5842a98b6f..da7bf332b0a 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -49,13 +49,8 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
/** Thread used to run the job, initialized once the job starts executing.
*/
private @Nullable Thread workerThread;
- /** Future returned from jobAction.call(). */
- private @Nullable CompletableFuture<R> jobFuture;
-
private final Lock lock = new ReentrantLock();
- private volatile boolean isInterrupted;
-
/**
* Constructor.
*
@@ -78,7 +73,7 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
}
try {
- jobFuture = jobAction.call();
+ CompletableFuture<R> jobFuture = jobAction.call();
if (jobFuture == null) {
// Allow null futures for synchronous jobs.
@@ -111,34 +106,17 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
* Sets interrupt status of the worker thread.
*/
void interrupt() {
- // Interrupt under the lock to prevent interrupting thread used by the
pool for another task
+ // Interrupt under the lock to prevent interrupting thread used by the
pool for another task.
lock.lock();
try {
if (workerThread != null) {
- // Set the interrupted flag first since it's used to determine
the final status of the job.
- // Job could handle interruption and exit before this flag is
set moving the job to completed state rather than canceled.
- isInterrupted = true;
workerThread.interrupt();
}
-
- if (jobFuture != null) {
- isInterrupted = true;
- jobFuture.cancel(true);
- }
} finally {
lock.unlock();
}
}
- /**
- * Indicates whether the execution was interrupted externally.
- *
- * @return {@code true} when the execution was interrupted externally.
- */
- boolean isInterrupted() {
- return isInterrupted;
- }
-
@Override
public int compareTo(QueueEntry o) {
int compare = Integer.compare(o.priority, this.priority);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index d914f62b10f..9f49c39155f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.log
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCompletedEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobExecutingEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobFailedEvent;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
import java.util.UUID;
@@ -209,11 +210,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
if (throwable instanceof QueueEntryCanceledException) {
logJobCanceledEvent(eventLog, eventMetadata);
result.completeExceptionally(new CancellationException());
- } else if (queueEntry.isInterrupted()) {
+ } else if (unwrapCause(throwable) instanceof
CancellationException) {
stateMachine.cancelJob(jobId);
logJobCanceledEvent(eventLog, eventMetadata);
result.completeExceptionally(throwable);
- } else if (retries.decrementAndGet() >= 0) {
+ } else if (!isCanceling() && retries.decrementAndGet() >= 0) {
stateMachine.queueJob(jobId);
run();
} else {
@@ -240,4 +241,9 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
JobState state = stateMachine.currentState(jobId);
return state != null && state.status() == JobStatus.CANCELED;
}
+
+ private boolean isCanceling() {
+ JobState state = stateMachine.currentState(jobId);
+ return state != null && state.status() == JobStatus.CANCELING;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index efa0a0c2fb9..dcd4f536f2f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -39,6 +39,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.time.Instant;
import java.util.Arrays;
@@ -47,12 +48,12 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.MapReduceJob;
@@ -190,15 +191,14 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
}
private void captureReduceSubmitFailure(Throwable throwable) {
- if (isCancelled.get()) {
+ TaskStatus status = unwrapCause(throwable) instanceof
CancellationException ? CANCELED : FAILED;
+
+ if (status == CANCELED) {
LOG.warn("Reduce job for task {} was cancelled.", taskId);
} else {
LOG.error("Failed to submit reduce job for task {}", throwable,
taskId);
}
- // Capture the reduce submit failure reason and time.
- TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
-
logEvent(eventLog, status == CANCELED ? COMPUTE_TASK_CANCELED :
COMPUTE_TASK_FAILED, eventMetadata);
JobState state = splitExecution.state();
@@ -215,15 +215,12 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
private void handleReduceResult(QueueExecution<R> reduceExecution) {
reduceExecution.resultAsync().whenComplete((result, throwable) -> {
- if (result != null) {
+ if (throwable == null) {
logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
} else {
- JobState reduceState = reduceExecution.state();
- // The state should never be null since it was just submitted,
but check just in case.
- if (reduceState != null) {
- IgniteEventType type = reduceState.status() ==
JobStatus.FAILED ? COMPUTE_TASK_FAILED : COMPUTE_TASK_CANCELED;
- logEvent(eventLog, type, eventMetadata);
- }
+ IgniteEventType type = unwrapCause(throwable) instanceof
CancellationException
+ ? COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
+ logEvent(eventLog, type, eventMetadata);
}
});
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 4792737a2c2..48b84af1c92 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.compute.JobStatus.FAILED;
import static
org.apache.ignite.internal.compute.ComputeUtils.getJobExecuteArgumentType;
import static
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatusAndCreateTimeStartTime;
@@ -45,6 +46,7 @@ import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.SharedComputeUtils;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
@@ -104,13 +106,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
@Test
void threadInterruption() {
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
- ExecutionOptions.DEFAULT,
- InterruptingJob.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
- null
- );
+ JobExecutionInternal<?> execution = executeJob(InterruptingJob.class);
JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
assertThat(execution.cancel(), is(true));
// InterruptingJob catches interruption and completes normally —
cooperative cancellation honors the result.
@@ -136,13 +132,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
@Test
void cooperativeCancellation() {
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
- ExecutionOptions.DEFAULT,
- CancellingJob.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
- null
- );
+ JobExecutionInternal<?> execution = executeJob(CancellingJob.class);
JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
assertThat(execution.cancel(), is(true));
// CancellingJob checks isCancelled() and completes normally —
cooperative cancellation honors the result.
@@ -170,13 +160,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
@Test
void cancelAwareCancellation() {
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
- ExecutionOptions.DEFAULT,
- CancelAwareJob.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
- null
- );
+ JobExecutionInternal<?> execution = executeJob(CancelAwareJob.class);
JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
assertThat(execution.cancel(), is(true));
// CancelAwareJob catches interruption and throws
CancellationException — job is canceled.
@@ -204,11 +188,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
int maxRetries = 5;
RetryJobFail.runTimes.set(0);
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
+ JobExecutionInternal<?> execution = executeJob(
ExecutionOptions.builder().maxRetries(maxRetries).build(),
- RetryJobFail.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
+ RetryJobFail.class,
null
);
@@ -233,11 +215,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
int maxRetries = 5;
RetryJobSuccess.runTimes.set(0);
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
+ JobExecutionInternal<?> execution = executeJob(
ExecutionOptions.builder().maxRetries(maxRetries).build(),
- RetryJobSuccess.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
+ RetryJobSuccess.class,
SharedComputeUtils.marshalArgOrResult(maxRetries, null)
);
@@ -267,11 +247,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
int maxRetries = 5;
JobSuccess.runTimes.set(0);
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
+ JobExecutionInternal<?> execution = executeJob(
ExecutionOptions.builder().maxRetries(maxRetries).build(),
- JobSuccess.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
+ JobSuccess.class,
null
);
@@ -315,13 +293,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
@Test
void cancelCompletedJob() {
- JobExecutionInternal<?> execution = computeExecutor.executeJob(
- ExecutionOptions.DEFAULT,
- SimpleJob.class.getName(),
- jobClassLoader,
- ComputeEventMetadata.builder(),
- null
- );
+ JobExecutionInternal<?> execution = executeJob(SimpleJob.class);
await().until(execution::state, jobStateWithStatus(COMPLETED));
@@ -334,4 +306,160 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
return completedFuture(0);
}
}
+
+ /**
+ * Async job returns a future immediately. A background thread polls
isCancelled(),
+ * then completes the future after a brief delay. On cancel, the job
should end in
+ * COMPLETED state with a successful result.
+ *
+ * <p>This test catches the bug where {@code jobFuture.cancel(true)} in
+ * {@code QueueEntry.interrupt()} overrides the job's result with
CancellationException,
+ * preventing the job from completing normally after cooperative
cancellation.
+ */
+ @Test
+ void cancelAsyncJobThatCompletesAfterCancellation() {
+ JobExecutionInternal<?> execution =
executeJob(AsyncDelayedCompleteJob.class);
+
+ JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
+ assertThat(execution.cancel(), is(true));
+
+ // The job detects cancellation via isCancelled(), does brief cleanup,
then completes with a result.
+ // Cooperative cancellation should honor the result — status must be
COMPLETED, not CANCELED.
+ await().until(
+ execution::state,
+ jobStateWithStatusAndCreateTimeStartTime(COMPLETED,
executingState.createTime(), executingState.startTime())
+ );
+ assertThat(execution.resultAsync().thenApply(h ->
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(42));
+ }
+
+ /** Async job that returns a future immediately. A background thread polls
isCancelled(), then completes. */
+ private static class AsyncDelayedCompleteJob implements
ComputeJob<Object[], Integer> {
+ @Override
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ CompletableFuture<Integer> result = new CompletableFuture<>();
+
+ Thread thread = new Thread(() -> {
+ while (!context.isCancelled()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ // Simulate cleanup after cancellation
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ // ignored
+ }
+
+ result.complete(42);
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ return result;
+ }
+ }
+
+ @Test
+ void cancelAsyncJobThatThrowsOnCancellation() {
+ JobExecutionInternal<?> execution =
executeJob(AsyncThrowOnCancelJob.class);
+
+ JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
+ assertThat(execution.cancel(), is(true));
+
+ // The job detects cancellation and throws RuntimeException (not
CancellationException).
+ // Since it's not a CancellationException, the job transitions to
FAILED.
+ await().until(
+ execution::state,
+ jobStateWithStatusAndCreateTimeStartTime(FAILED,
executingState.createTime(), executingState.startTime())
+ );
+ assertThat(execution.resultAsync(), willThrow(RuntimeException.class,
"Job cancelled"));
+ }
+
+ @Test
+ void cancelJobWithRetriesDoesNotRetry() {
+ int maxRetries = 5;
+ AsyncThrowOnCancelJob.runTimes.set(0);
+
+ JobExecutionInternal<?> execution = executeJob(
+ ExecutionOptions.builder().maxRetries(maxRetries).build(),
+ AsyncThrowOnCancelJob.class,
+ null
+ );
+
+ await().until(execution::state, jobStateWithStatus(EXECUTING));
+ assertThat(execution.cancel(), is(true));
+
+ // The job throws RuntimeException (not CancellationException) after
cancel.
+ // Even though retries are configured, the job must not be retried
because cancel was requested.
+ await().until(execution::state, jobStateWithStatus(FAILED));
+ assertThat(AsyncThrowOnCancelJob.runTimes.get(), is(1));
+ }
+
+ /** Async job that throws RuntimeException when cancellation is detected.
*/
+ private static class AsyncThrowOnCancelJob implements ComputeJob<Object[],
Integer> {
+ static final AtomicInteger runTimes = new AtomicInteger();
+
+ @Override
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ runTimes.incrementAndGet();
+ CompletableFuture<Integer> result = new CompletableFuture<>();
+
+ Thread thread = new Thread(() -> {
+ while (!context.isCancelled()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ result.completeExceptionally(new RuntimeException("Job
cancelled"));
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ return result;
+ }
+ }
+
+ /**
+ * Job throws CancellationException without external cancel() call.
+ * The exception type alone should move the job to CANCELED.
+ */
+ @Test
+ void jobThrowingCancellationExceptionTransitionsToCanceled() {
+ JobExecutionInternal<?> execution =
executeJob(SelfCancellingJob.class);
+
+ await().until(
+ execution::state,
+ jobStateWithStatus(CANCELED)
+ );
+ assertThat(execution.resultAsync(),
willThrow(CancellationException.class));
+ }
+
+ /** Job that immediately throws CancellationException without external
cancel(). */
+ private static class SelfCancellingJob implements ComputeJob<Object[],
Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Object... args) {
+ return CompletableFuture.failedFuture(new
CancellationException("self-cancelled"));
+ }
+ }
+
+ private JobExecutionInternal<?> executeJob(Class<?> jobClass) {
+ return executeJob(ExecutionOptions.DEFAULT, jobClass, null);
+ }
+
+ private JobExecutionInternal<?> executeJob(ExecutionOptions options,
Class<?> jobClass, @Nullable ComputeJobDataHolder arg) {
+ return computeExecutor.executeJob(
+ options,
+ jobClass.getName(),
+ jobClassLoader,
+ ComputeEventMetadata.builder(),
+ arg
+ );
+ }
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index d1837da09bb..4a4779afcab 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -284,9 +284,10 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
assertThat(execution.cancel(), is(true));
+ // InterruptedException is not CancellationException, so the job
transitions to FAILED.
await().until(
execution::state,
- jobStateWithStatusAndCreateTimeStartTime(CANCELED,
executingState.createTime(), executingState.startTime())
+ jobStateWithStatusAndCreateTimeStartTime(FAILED,
executingState.createTime(), executingState.startTime())
);
assertThat(execution.resultAsync(),
willThrow(InterruptedException.class));
}
@@ -368,7 +369,8 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
await().until(execution::state, jobStateWithStatus(EXECUTING));
execution.cancel();
- await().until(execution::state, jobStateWithStatus(CANCELED));
+ // InterruptedException is not CancellationException, so the job
transitions to FAILED (no retry).
+ await().until(execution::state, jobStateWithStatus(FAILED));
assertThat(runTimes.get(), is(1));
}
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index 2025eee0230..90b45ec9ff0 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -559,7 +559,8 @@ TEST_F(compute_test, job_execution_cancel) {
auto state = execution.get_state();
ASSERT_TRUE(state.has_value());
- EXPECT_EQ(job_status::CANCELED, state->status);
+ // SleepJob throws RuntimeException on interrupt, not
CancellationException.
+ EXPECT_EQ(job_status::FAILED, state->status);
}
TEST_F(compute_test, job_execution_change_priority) {
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 90db3da5ebd..c36722e1a73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -684,7 +684,8 @@ namespace Apache.Ignite.Tests.Compute
await cts.CancelAsync();
- await AssertWaitJobStatus(jobExecution, JobStatus.Canceled,
beforeStart);
+ // SleepJob throws RuntimeException on interrupt, not
CancellationException.
+ await AssertWaitJobStatus(jobExecution, JobStatus.Failed,
beforeStart);
var ex = Assert.ThrowsAsync<ComputeException>(async () => await
jobExecution.GetResultAsync());
@@ -711,7 +712,8 @@ namespace Apache.Ignite.Tests.Compute
foreach (var jobExec in jobExecution.JobExecutions)
{
- await AssertWaitJobStatus(jobExec, JobStatus.Canceled,
beforeStart);
+ // SleepJob throws RuntimeException on interrupt, not
CancellationException.
+ await AssertWaitJobStatus(jobExec, JobStatus.Failed,
beforeStart);
var ex = Assert.ThrowsAsync<ComputeException>(async () =>
await jobExec.GetResultAsync());
@@ -728,6 +730,14 @@ namespace Apache.Ignite.Tests.Compute
var taskExec = await
Client.Compute.SubmitMapReduceAsync(SleepTask, 10_000, cts.Token);
+ // Wait until all jobs are executing to avoid timing-dependent
status.
+ await TestUtils.WaitForConditionAsync(
+ async () =>
+ {
+ var states = await taskExec.GetJobStatesAsync();
+ return states.All(s => s?.Status == JobStatus.Executing);
+ });
+
await cts.CancelAsync();
var ex = Assert.ThrowsAsync<OperationCanceledException>(async ()
=> await taskExec.GetResultAsync());
@@ -737,12 +747,13 @@ namespace Apache.Ignite.Tests.Compute
foreach (var jobState in jobStates)
{
- Assert.AreEqual(JobStatus.Canceled, jobState?.Status);
+ // SleepJob throws RuntimeException, not CancellationException.
+ Assert.AreEqual(JobStatus.Failed, jobState?.Status);
}
TaskState? state = await taskExec.GetStateAsync();
- Assert.AreEqual(TaskStatus.Canceled, state?.Status);
+ Assert.AreEqual(TaskStatus.Canceled, state?.Status); // Task was
explicitly cancelled.
}
[Test]