This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ce15f41b414 IGNITE-28344 Don't cancel job future when canceling the 
job (#7861)
ce15f41b414 is described below

commit ce15f41b414f511bc47d05fdb700492bfe6cc609
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Apr 2 18:29:05 2026 +0300

    IGNITE-28344 Don't cancel job future when canceling the job (#7861)
---
 .../internal/client/ItThinClientComputeTest.java   |  21 ++-
 .../ignite/internal/compute/ItComputeBaseTest.java |  32 +++-
 .../compute/ItFailoverCandidateNotFoundTest.java   |  34 ++--
 .../internal/compute/utils/InteractiveJobs.java    |   5 +-
 .../internal/compute/utils/InteractiveTasks.java   |   7 +-
 .../jobs/embedded/AsyncDelayedCompleteJob.java     |  57 ++++++
 .../jobs/standalone/AsyncDelayedCompleteJob.java   |  57 ++++++
 .../ignite/internal/compute/queue/QueueEntry.java  |  26 +--
 .../internal/compute/queue/QueueExecutionImpl.java |  10 +-
 .../compute/task/TaskExecutionInternal.java        |  21 +--
 .../compute/executor/ComputeExecutorTest.java      | 208 +++++++++++++++++----
 .../compute/queue/PriorityQueueExecutorTest.java   |   6 +-
 .../cpp/tests/client-test/compute_test.cpp         |   3 +-
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  19 +-
 14 files changed, 394 insertions(+), 112 deletions(-)

diff --git 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
index 823e46366c3..0677b1f18c5 100644
--- 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
+++ 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.client;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
 import static org.apache.ignite.compute.JobStatus.COMPLETED;
 import static org.apache.ignite.compute.JobStatus.EXECUTING;
 import static org.apache.ignite.compute.JobStatus.FAILED;
@@ -85,6 +84,7 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.compute.TaskStatus;
@@ -295,8 +295,11 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
         assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
 
-        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
-        await().until(execution2::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        // Async job completes normally after cooperative cancellation 
(returns from isCancelled() check) — COMPLETED.
+        // Sync job throws RuntimeException on thread interruption — FAILED 
(not CancellationException).
+        JobStatus expectedStatus = asyncJob ? COMPLETED : FAILED;
+        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(expectedStatus)));
+        await().until(execution2::stateAsync, 
willBe(jobStateWithStatus(expectedStatus)));
     }
 
     @Test
@@ -329,7 +332,8 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
         // Cancel task 1, task 3 should start executing
         assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
-        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        // SleepJob throws RuntimeException, not CancellationException.
+        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(FAILED)));
         await().until(execution3::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
 
         // Task 2 is still queued
@@ -408,9 +412,10 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
         cancelHandle.cancel();
 
+        // SleepJob throws RuntimeException on interrupt, not 
CancellationException.
         await().until(() -> executions, contains(
-                jobExecutionWithStatus(CANCELED),
-                jobExecutionWithStatus(CANCELED)
+                jobExecutionWithStatus(FAILED),
+                jobExecutionWithStatus(FAILED)
         ));
 
         assertThat(broadcastExecution.resultsAsync(), 
willThrow(ComputeException.class));
@@ -725,7 +730,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
         assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
 
-        await().until(tupleExecution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        await().until(tupleExecution::stateAsync, 
willBe(jobStateWithStatus(FAILED)));
     }
 
     @ParameterizedTest
@@ -747,7 +752,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
         assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
 
-        await().until(pojoExecution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        await().until(pojoExecution::stateAsync, 
willBe(jobStateWithStatus(FAILED)));
     }
 
     @Test
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 63147f204cf..6b64fe363dc 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -679,7 +679,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         // RuntimeException is thrown when SleepJob catches the 
InterruptedException
         assertThat(runtimeException.toString(), 
containsString(InterruptedException.class.getName()));
 
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(FAILED)));
     }
 
     @ParameterizedTest
@@ -729,7 +729,31 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
         // Cancel running task
         assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
-        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(FAILED)));
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void asyncJobCompletesNormallyAfterCooperativeCancellation(boolean local) {
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecution<String> execution = submit(
+                JobTarget.node(clusterNode(executeNode)),
+                asyncDelayedCompleteJob(),
+                cancelHandle.token(),
+                null
+        );
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        cancelHandle.cancel();
+
+        // The async job detects cancellation via isCancelled(), does cleanup, 
then completes with a result.
+        // Cooperative cancellation should honor the result — status must be 
COMPLETED, not CANCELED.
+        assertThat(execution.resultAsync(), 
willBe(is("completed-after-cancel")));
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(COMPLETED)));
     }
 
     @ParameterizedTest
@@ -963,6 +987,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         return TaskDescriptor.<Void, 
Void>builder(jobClassName("InfiniteMapReduceTask")).units(units()).build();
     }
 
+    private JobDescriptor<Void, String> asyncDelayedCompleteJob() {
+        return JobDescriptor.<Void, 
String>builder(jobClassName("AsyncDelayedCompleteJob")).units(units()).build();
+    }
+
     private JobDescriptor<Tuple, Integer> tupleJob() {
         return JobDescriptor.<Tuple, 
Integer>builder(jobClassName("TupleJob")).units(units()).build();
     }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
index 83ec8c62612..bf3d50c75cb 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.compute;
 
 import static 
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELED;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
@@ -134,14 +135,17 @@ abstract class ItFailoverCandidateNotFoundTest extends 
ClusterPerTestIntegration
 
         String jobClassName = InteractiveJobs.globalJob().name();
 
-        // When node is shut down gracefully, the job execution is interrupted 
and event could be logged anyway
-        // So there would be 2 events from a worker node, 1 failed events from 
a worker node and 1 failed event from the coordinator
-        await().until(logInspector::events, contains(
+        // When node is shut down gracefully, the job execution is interrupted 
and CancellationException is thrown.
+        // There would be 2 events from a worker node (QUEUED, EXECUTING), 
then a CANCELED event from the worker node
+        // and a FAILED event from the coordinator. The order of the last two 
events is not determined.
+        await().until(logInspector::events, hasSize(4));
+
+        assertThat(logInspector.events(), containsInRelativeOrder(
                 jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName, 
workerNodeName),
-                jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, 
workerNodeName),
-                jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, 
workerNodeName),
-                jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, 
workerNodeName)
+                jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, 
workerNodeName)
         ));
+        assertThat(logInspector.events(), 
hasItem(jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, workerNodeName)));
+        assertThat(logInspector.events(), hasItem(jobEvent(COMPUTE_JOB_FAILED, 
jobId, jobClassName, workerNodeName)));
     }
 
     @Test
@@ -177,9 +181,9 @@ abstract class ItFailoverCandidateNotFoundTest extends 
ClusterPerTestIntegration
         // Then the job is failed, because there are no more failover workers.
         execution.assertFailed();
 
-        // When node is shut down gracefully, the job execution is interrupted 
and event could be logged anyway
-        // So there would be 4 events from worker nodes, 2 failed events from 
both worker nodes and 1 failed event from the coordinator
-        // The order of failed events is not determined
+        // Each worker node logs QUEUED, EXECUTING, CANCELED (interrupted on 
shutdown → CancellationException).
+        // The coordinator logs FAILED (no more failover candidates).
+        // The order of the last 3 terminal events is not determined.
         await().until(logInspector::events, hasSize(7));
 
         String jobClassName = InteractiveJobs.globalJob().name();
@@ -189,12 +193,18 @@ abstract class ItFailoverCandidateNotFoundTest extends 
ClusterPerTestIntegration
                 jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, 
workerNodeName),
                 jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName, 
failoverWorker),
                 jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, 
failoverWorker),
-                jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, 
failoverWorker)
+                // Interrupted on shutdown → CancellationException → CANCELED
+                jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, 
failoverWorker)
         ));
 
-        // Failed event from second worker node
+        // First worker was also interrupted on shutdown → CANCELED
         assertThat(logInspector.events(), hasItem(
-                jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, 
workerNodeName)
+                jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, 
workerNodeName)
+        ));
+
+        // Coordinator's failover gives up (no more candidates) → FAILED, 
targetNode is the last worker attempted
+        assertThat(logInspector.events(), hasItem(
+                jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, 
failoverWorker)
         ));
     }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 71522768cba..c122320aba3 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -172,7 +173,7 @@ public final class InteractiveJobs {
             try {
                 return GLOBAL_SIGNALS.take();
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new CancellationException();
             }
         }
 
@@ -233,7 +234,7 @@ public final class InteractiveJobs {
             try {
                 return channel.take();
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new CancellationException();
             }
         }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index 8572f156251..f12ff230a80 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -166,7 +167,7 @@ public final class InteractiveTasks {
                 return GLOBAL_SIGNALS.take();
             } catch (InterruptedException e) {
                 if (throwExceptionOnInterruption) {
-                    throw new RuntimeException(e);
+                    throw new CancellationException();
                 } else {
                     Thread.currentThread().interrupt();
                     return Signal.CHECK_CANCEL;
@@ -201,7 +202,7 @@ public final class InteractiveTasks {
                             ).collect(toList()));
                         case CHECK_CANCEL:
                             if (context.isCancelled()) {
-                                throw new RuntimeException("Task is 
cancelled");
+                                throw new CancellationException("Task is 
cancelled");
                             }
                             break;
                         default:
@@ -229,7 +230,7 @@ public final class InteractiveTasks {
                             return completedFuture(new 
ArrayList<>(results.values()));
                         case CHECK_CANCEL:
                             if (context.isCancelled()) {
-                                throw new RuntimeException("Task is 
cancelled");
+                                throw new CancellationException("Task is 
cancelled");
                             }
                             break;
                         default:
diff --git 
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
new file mode 100644
index 00000000000..0259e960794
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/AsyncDelayedCompleteJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Async compute job that returns a future immediately. A background thread 
polls
+ * {@code isCancelled()}, then completes the future with a result after a 
brief delay.
+ * This tests that cooperative cancellation of async jobs is honored.
+ */
+public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
+    @Override
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Void arg) {
+        CompletableFuture<String> result = new CompletableFuture<>();
+
+        Thread thread = new Thread(() -> {
+            while (!context.isCancelled()) {
+                try {
+                    Thread.sleep(50);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+
+            // Simulate cleanup after cancellation
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ignored) {
+                // ignored
+            }
+
+            result.complete("completed-after-cancel");
+        });
+        thread.setDaemon(true);
+        thread.start();
+
+        return result;
+    }
+}
diff --git 
a/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
new file mode 100644
index 00000000000..15f828a4d27
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/AsyncDelayedCompleteJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Async compute job that returns a future immediately. A background thread 
polls
+ * {@code isCancelled()}, then completes the future with a result after a 
brief delay.
+ * This tests that cooperative cancellation of async jobs is honored.
+ */
+public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
+    @Override
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Void arg) {
+        CompletableFuture<String> result = new CompletableFuture<>();
+
+        Thread thread = new Thread(() -> {
+            while (!context.isCancelled()) {
+                try {
+                    Thread.sleep(50);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+
+            // Simulate cleanup after cancellation
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ignored) {
+                // ignored
+            }
+
+            result.complete("completed-after-cancel");
+        });
+        thread.setDaemon(true);
+        thread.start();
+
+        return result;
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index c5842a98b6f..da7bf332b0a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -49,13 +49,8 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
     /** Thread used to run the job, initialized once the job starts executing. 
*/
     private @Nullable Thread workerThread;
 
-    /** Future returned from jobAction.call(). */
-    private @Nullable CompletableFuture<R> jobFuture;
-
     private final Lock lock = new ReentrantLock();
 
-    private volatile boolean isInterrupted;
-
     /**
      * Constructor.
      *
@@ -78,7 +73,7 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
         }
 
         try {
-            jobFuture = jobAction.call();
+            CompletableFuture<R> jobFuture = jobAction.call();
 
             if (jobFuture == null) {
                 // Allow null futures for synchronous jobs.
@@ -111,34 +106,17 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
      * Sets interrupt status of the worker thread.
      */
     void interrupt() {
-        // Interrupt under the lock to prevent interrupting thread used by the 
pool for another task
+        // Interrupt under the lock to prevent interrupting thread used by the 
pool for another task.
         lock.lock();
         try {
             if (workerThread != null) {
-                // Set the interrupted flag first since it's used to determine 
the final status of the job.
-                // Job could handle interruption and exit before this flag is 
set moving the job to completed state rather than canceled.
-                isInterrupted = true;
                 workerThread.interrupt();
             }
-
-            if (jobFuture != null) {
-                isInterrupted = true;
-                jobFuture.cancel(true);
-            }
         } finally {
             lock.unlock();
         }
     }
 
-    /**
-     * Indicates whether the execution was interrupted externally.
-     *
-     * @return {@code true} when the execution was interrupted externally.
-     */
-    boolean isInterrupted() {
-        return isInterrupted;
-    }
-
     @Override
     public int compareTo(QueueEntry o) {
         int compare = Integer.compare(o.priority, this.priority);
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index d914f62b10f..9f49c39155f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.log
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCompletedEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobExecutingEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobFailedEvent;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
 
 import java.util.UUID;
@@ -209,11 +210,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
                 if (throwable instanceof QueueEntryCanceledException) {
                     logJobCanceledEvent(eventLog, eventMetadata);
                     result.completeExceptionally(new CancellationException());
-                } else if (queueEntry.isInterrupted()) {
+                } else if (unwrapCause(throwable) instanceof 
CancellationException) {
                     stateMachine.cancelJob(jobId);
                     logJobCanceledEvent(eventLog, eventMetadata);
                     result.completeExceptionally(throwable);
-                } else if (retries.decrementAndGet() >= 0) {
+                } else if (!isCanceling() && retries.decrementAndGet() >= 0) {
                     stateMachine.queueJob(jobId);
                     run();
                 } else {
@@ -240,4 +241,9 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
         JobState state = stateMachine.currentState(jobId);
         return state != null && state.status() == JobStatus.CANCELED;
     }
+
+    private boolean isCanceling() {
+        JobState state = stateMachine.currentState(jobId);
+        return state != null && state.status() == JobStatus.CANCELING;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index efa0a0c2fb9..dcd4f536f2f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -39,6 +39,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.time.Instant;
 import java.util.Arrays;
@@ -47,12 +48,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
-import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.compute.task.MapReduceJob;
@@ -190,15 +191,14 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
     }
 
     private void captureReduceSubmitFailure(Throwable throwable) {
-        if (isCancelled.get()) {
+        TaskStatus status = unwrapCause(throwable) instanceof 
CancellationException ? CANCELED : FAILED;
+
+        if (status == CANCELED) {
             LOG.warn("Reduce job for task {} was cancelled.", taskId);
         } else {
             LOG.error("Failed to submit reduce job for task {}", throwable, 
taskId);
         }
 
-        // Capture the reduce submit failure reason and time.
-        TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
-
         logEvent(eventLog, status == CANCELED ? COMPUTE_TASK_CANCELED : 
COMPUTE_TASK_FAILED, eventMetadata);
 
         JobState state = splitExecution.state();
@@ -215,15 +215,12 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
 
     private void handleReduceResult(QueueExecution<R> reduceExecution) {
         reduceExecution.resultAsync().whenComplete((result, throwable) -> {
-            if (result != null) {
+            if (throwable == null) {
                 logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
             } else {
-                JobState reduceState = reduceExecution.state();
-                // The state should never be null since it was just submitted, 
but check just in case.
-                if (reduceState != null) {
-                    IgniteEventType type = reduceState.status() == 
JobStatus.FAILED ? COMPUTE_TASK_FAILED : COMPUTE_TASK_CANCELED;
-                    logEvent(eventLog, type, eventMetadata);
-                }
+                IgniteEventType type = unwrapCause(throwable) instanceof 
CancellationException
+                        ? COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
+                logEvent(eventLog, type, eventMetadata);
             }
         });
     }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 4792737a2c2..48b84af1c92 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.compute.JobStatus.EXECUTING;
 import static org.apache.ignite.compute.JobStatus.FAILED;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.getJobExecuteArgumentType;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatusAndCreateTimeStartTime;
@@ -45,6 +46,7 @@ import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.SharedComputeUtils;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
@@ -104,13 +106,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     @Test
     void threadInterruption() {
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
-                ExecutionOptions.DEFAULT,
-                InterruptingJob.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
-                null
-        );
+        JobExecutionInternal<?> execution = executeJob(InterruptingJob.class);
         JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
         assertThat(execution.cancel(), is(true));
         // InterruptingJob catches interruption and completes normally — 
cooperative cancellation honors the result.
@@ -136,13 +132,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     @Test
     void cooperativeCancellation() {
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
-                ExecutionOptions.DEFAULT,
-                CancellingJob.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
-                null
-        );
+        JobExecutionInternal<?> execution = executeJob(CancellingJob.class);
         JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
         assertThat(execution.cancel(), is(true));
         // CancellingJob checks isCancelled() and completes normally — 
cooperative cancellation honors the result.
@@ -170,13 +160,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     @Test
     void cancelAwareCancellation() {
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
-                ExecutionOptions.DEFAULT,
-                CancelAwareJob.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
-                null
-        );
+        JobExecutionInternal<?> execution = executeJob(CancelAwareJob.class);
         JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
         assertThat(execution.cancel(), is(true));
         // CancelAwareJob catches interruption and throws 
CancellationException — job is canceled.
@@ -204,11 +188,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         int maxRetries = 5;
         RetryJobFail.runTimes.set(0);
 
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
+        JobExecutionInternal<?> execution = executeJob(
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
-                RetryJobFail.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
+                RetryJobFail.class,
                 null
         );
 
@@ -233,11 +215,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         int maxRetries = 5;
         RetryJobSuccess.runTimes.set(0);
 
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
+        JobExecutionInternal<?> execution = executeJob(
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
-                RetryJobSuccess.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
+                RetryJobSuccess.class,
                 SharedComputeUtils.marshalArgOrResult(maxRetries, null)
         );
 
@@ -267,11 +247,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         int maxRetries = 5;
         JobSuccess.runTimes.set(0);
 
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
+        JobExecutionInternal<?> execution = executeJob(
                 ExecutionOptions.builder().maxRetries(maxRetries).build(),
-                JobSuccess.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
+                JobSuccess.class,
                 null
         );
 
@@ -315,13 +293,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     @Test
     void cancelCompletedJob() {
-        JobExecutionInternal<?> execution = computeExecutor.executeJob(
-                ExecutionOptions.DEFAULT,
-                SimpleJob.class.getName(),
-                jobClassLoader,
-                ComputeEventMetadata.builder(),
-                null
-        );
+        JobExecutionInternal<?> execution = executeJob(SimpleJob.class);
 
         await().until(execution::state, jobStateWithStatus(COMPLETED));
 
@@ -334,4 +306,160 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
             return completedFuture(0);
         }
     }
+
+    /**
+     * Async job returns a future immediately. A background thread polls 
isCancelled(),
+     * then completes the future after a brief delay. On cancel, the job 
should end in
+     * COMPLETED state with a successful result.
+     *
+     * <p>This test catches the bug where {@code jobFuture.cancel(true)} in
+     * {@code QueueEntry.interrupt()} overrides the job's result with 
CancellationException,
+     * preventing the job from completing normally after cooperative 
cancellation.
+     */
+    @Test
+    void cancelAsyncJobThatCompletesAfterCancellation() {
+        JobExecutionInternal<?> execution = 
executeJob(AsyncDelayedCompleteJob.class);
+
+        JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
+        assertThat(execution.cancel(), is(true));
+
+        // The job detects cancellation via isCancelled(), does brief cleanup, 
then completes with a result.
+        // Cooperative cancellation should honor the result — status must be 
COMPLETED, not CANCELED.
+        await().until(
+                execution::state,
+                jobStateWithStatusAndCreateTimeStartTime(COMPLETED, 
executingState.createTime(), executingState.startTime())
+        );
+        assertThat(execution.resultAsync().thenApply(h -> 
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(42));
+    }
+
+    /** Async job that returns a future immediately. A background thread polls 
isCancelled(), then completes. */
+    private static class AsyncDelayedCompleteJob implements 
ComputeJob<Object[], Integer> {
+        @Override
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
+            CompletableFuture<Integer> result = new CompletableFuture<>();
+
+            Thread thread = new Thread(() -> {
+                while (!context.isCancelled()) {
+                    try {
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+
+                // Simulate cleanup after cancellation
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException ignored) {
+                    // ignored
+                }
+
+                result.complete(42);
+            });
+            thread.setDaemon(true);
+            thread.start();
+
+            return result;
+        }
+    }
+
+    @Test
+    void cancelAsyncJobThatThrowsOnCancellation() {
+        JobExecutionInternal<?> execution = 
executeJob(AsyncThrowOnCancelJob.class);
+
+        JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
+        assertThat(execution.cancel(), is(true));
+
+        // The job detects cancellation and throws RuntimeException (not 
CancellationException).
+        // Since it's not a CancellationException, the job transitions to 
FAILED.
+        await().until(
+                execution::state,
+                jobStateWithStatusAndCreateTimeStartTime(FAILED, 
executingState.createTime(), executingState.startTime())
+        );
+        assertThat(execution.resultAsync(), willThrow(RuntimeException.class, 
"Job cancelled"));
+    }
+
+    @Test
+    void cancelJobWithRetriesDoesNotRetry() {
+        int maxRetries = 5;
+        AsyncThrowOnCancelJob.runTimes.set(0);
+
+        JobExecutionInternal<?> execution = executeJob(
+                ExecutionOptions.builder().maxRetries(maxRetries).build(),
+                AsyncThrowOnCancelJob.class,
+                null
+        );
+
+        await().until(execution::state, jobStateWithStatus(EXECUTING));
+        assertThat(execution.cancel(), is(true));
+
+        // The job throws RuntimeException (not CancellationException) after 
cancel.
+        // Even though retries are configured, the job must not be retried 
because cancel was requested.
+        await().until(execution::state, jobStateWithStatus(FAILED));
+        assertThat(AsyncThrowOnCancelJob.runTimes.get(), is(1));
+    }
+
+    /** Async job that throws RuntimeException when cancellation is detected. 
*/
+    private static class AsyncThrowOnCancelJob implements ComputeJob<Object[], 
Integer> {
+        static final AtomicInteger runTimes = new AtomicInteger();
+
+        @Override
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
+            runTimes.incrementAndGet();
+            CompletableFuture<Integer> result = new CompletableFuture<>();
+
+            Thread thread = new Thread(() -> {
+                while (!context.isCancelled()) {
+                    try {
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+
+                result.completeExceptionally(new RuntimeException("Job 
cancelled"));
+            });
+            thread.setDaemon(true);
+            thread.start();
+
+            return result;
+        }
+    }
+
+    /**
+     * Job throws CancellationException without external cancel() call.
+     * The exception type alone should move the job to CANCELED.
+     */
+    @Test
+    void jobThrowingCancellationExceptionTransitionsToCanceled() {
+        JobExecutionInternal<?> execution = 
executeJob(SelfCancellingJob.class);
+
+        await().until(
+                execution::state,
+                jobStateWithStatus(CANCELED)
+        );
+        assertThat(execution.resultAsync(), 
willThrow(CancellationException.class));
+    }
+
+    /** Job that immediately throws CancellationException without external 
cancel(). */
+    private static class SelfCancellingJob implements ComputeJob<Object[], 
Void> {
+        @Override
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Object... args) {
+            return CompletableFuture.failedFuture(new 
CancellationException("self-cancelled"));
+        }
+    }
+
+    private JobExecutionInternal<?> executeJob(Class<?> jobClass) {
+        return executeJob(ExecutionOptions.DEFAULT, jobClass, null);
+    }
+
+    private JobExecutionInternal<?> executeJob(ExecutionOptions options, 
Class<?> jobClass, @Nullable ComputeJobDataHolder arg) {
+        return computeExecutor.executeJob(
+                options,
+                jobClass.getName(),
+                jobClassLoader,
+                ComputeEventMetadata.builder(),
+                arg
+        );
+    }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index d1837da09bb..4a4779afcab 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -284,9 +284,10 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
 
         assertThat(execution.cancel(), is(true));
 
+        // InterruptedException is not CancellationException, so the job 
transitions to FAILED.
         await().until(
                 execution::state,
-                jobStateWithStatusAndCreateTimeStartTime(CANCELED, 
executingState.createTime(), executingState.startTime())
+                jobStateWithStatusAndCreateTimeStartTime(FAILED, 
executingState.createTime(), executingState.startTime())
         );
         assertThat(execution.resultAsync(), 
willThrow(InterruptedException.class));
     }
@@ -368,7 +369,8 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         await().until(execution::state, jobStateWithStatus(EXECUTING));
         execution.cancel();
 
-        await().until(execution::state, jobStateWithStatus(CANCELED));
+        // InterruptedException is not CancellationException, so the job 
transitions to FAILED (no retry).
+        await().until(execution::state, jobStateWithStatus(FAILED));
         assertThat(runTimes.get(), is(1));
     }
 
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp 
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index 2025eee0230..90b45ec9ff0 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -559,7 +559,8 @@ TEST_F(compute_test, job_execution_cancel) {
     auto state = execution.get_state();
 
     ASSERT_TRUE(state.has_value());
-    EXPECT_EQ(job_status::CANCELED, state->status);
+    // SleepJob throws RuntimeException on interrupt, not 
CancellationException.
+    EXPECT_EQ(job_status::FAILED, state->status);
 }
 
 TEST_F(compute_test, job_execution_change_priority) {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 90db3da5ebd..c36722e1a73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -684,7 +684,8 @@ namespace Apache.Ignite.Tests.Compute
 
             await cts.CancelAsync();
 
-            await AssertWaitJobStatus(jobExecution, JobStatus.Canceled, 
beforeStart);
+            // SleepJob throws RuntimeException on interrupt, not 
CancellationException.
+            await AssertWaitJobStatus(jobExecution, JobStatus.Failed, 
beforeStart);
 
             var ex = Assert.ThrowsAsync<ComputeException>(async () => await 
jobExecution.GetResultAsync());
 
@@ -711,7 +712,8 @@ namespace Apache.Ignite.Tests.Compute
 
             foreach (var jobExec in jobExecution.JobExecutions)
             {
-                await AssertWaitJobStatus(jobExec, JobStatus.Canceled, 
beforeStart);
+                // SleepJob throws RuntimeException on interrupt, not 
CancellationException.
+                await AssertWaitJobStatus(jobExec, JobStatus.Failed, 
beforeStart);
 
                 var ex = Assert.ThrowsAsync<ComputeException>(async () => 
await jobExec.GetResultAsync());
 
@@ -728,6 +730,14 @@ namespace Apache.Ignite.Tests.Compute
 
             var taskExec = await 
Client.Compute.SubmitMapReduceAsync(SleepTask, 10_000, cts.Token);
 
+            // Wait until all jobs are executing to avoid timing-dependent 
status.
+            await TestUtils.WaitForConditionAsync(
+                async () =>
+                {
+                    var states = await taskExec.GetJobStatesAsync();
+                    return states.All(s => s?.Status == JobStatus.Executing);
+                });
+
             await cts.CancelAsync();
 
             var ex = Assert.ThrowsAsync<OperationCanceledException>(async () 
=> await taskExec.GetResultAsync());
@@ -737,12 +747,13 @@ namespace Apache.Ignite.Tests.Compute
 
             foreach (var jobState in jobStates)
             {
-                Assert.AreEqual(JobStatus.Canceled, jobState?.Status);
+                // SleepJob throws RuntimeException, not CancellationException.
+                Assert.AreEqual(JobStatus.Failed, jobState?.Status);
             }
 
             TaskState? state = await taskExec.GetStateAsync();
 
-            Assert.AreEqual(TaskStatus.Canceled, state?.Status);
+            Assert.AreEqual(TaskStatus.Canceled, state?.Status); // Task was 
explicitly cancelled.
         }
 
         [Test]


Reply via email to