This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 86c7e197c07 IGNITE-28189 Fixed incorrect status of a Compute Task when 
it is canceled (#12876)
86c7e197c07 is described below

commit 86c7e197c07080ebb46f8916db641de16df17eb8
Author: Mikhail Petrov <[email protected]>
AuthorDate: Mon Mar 16 13:48:08 2026 +0300

    IGNITE-28189 Fixed incorrect status of a Compute Task when it is canceled 
(#12876)
---
 .../internal/processors/job/GridJobProcessor.java  |  2 +-
 .../internal/processors/job/GridJobWorker.java     | 75 ++++++++--------------
 2 files changed, 28 insertions(+), 49 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index fd1b0122d6d..3a5924c9684 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -485,7 +485,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         // Cancel only if we force grid to stop
         if (cancel) {
             for (GridJobWorker job : activeJobs.values()) {
-                job.onStopping();
+                job.onNodeStopping();
 
                 cancelJob(job, false);
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 90f32db551f..b59fd5b15da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.jetbrains.annotations.Nullable;
@@ -145,16 +144,16 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
     private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();
 
     /** */
-    private volatile boolean timedOut;
+    private volatile boolean isStarted;
 
     /** */
-    private volatile boolean sysCancelled;
+    private volatile boolean isCancelledBySystem;
 
     /** */
-    private volatile boolean sysStopping;
+    private volatile boolean isTimedOut;
 
     /** */
-    private volatile boolean isStarted;
+    private volatile boolean isNodeStopping;
 
     /** Deployed job. */
     private ComputeJob job;
@@ -272,15 +271,6 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
         return dep;
     }
 
-    /**
-     * Returns {@code True} if job was cancelled by the system.
-     *
-     * @return {@code True} if job was cancelled by the system.
-     */
-    boolean isSystemCanceled() {
-        return sysCancelled;
-    }
-
     /**
      * @return Create time.
      */
@@ -402,7 +392,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
      * @return {@code True} if job is timed out.
      */
     public boolean isTimedOut() {
-        return timedOut;
+        return isTimedOut;
     }
 
     /**
@@ -417,7 +407,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
         if (finishing.get())
             return;
 
-        timedOut = true;
+        isTimedOut = true;
 
         U.warn(log, "Job has timed out: " + ses);
 
@@ -430,8 +420,8 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
     /**
      * Callback for whenever grid is stopping.
      */
-    public void onStopping() {
-        sysStopping = true;
+    public void onNodeStopping() {
+        isNodeStopping = true;
     }
 
     /**
@@ -561,10 +551,6 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                 }
             }
 
-            if (isCancelled())
-                // If job was cancelled prior to assigning runner to it?
-                super.cancel();
-
             if (!skipNtf) {
                 if (holdLsnr.onUnheld(this)) {
                     if (held.decrementAndGet() == 0)
@@ -618,7 +604,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                 }
             }
             catch (IgniteException e) {
-                if (sysStopping && 
e.hasCause(IgniteInterruptedCheckedException.class, 
InterruptedException.class)) {
+                if (isNodeStopping && 
e.hasCause(IgniteInterruptedCheckedException.class, 
InterruptedException.class)) {
                     ex = handleThrowable(e);
 
                     assert ex != null;
@@ -700,7 +686,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
 
         // Special handling for weird interrupted exception which
         // happens due to JDk 1.5 bug.
-        if (e instanceof InterruptedException && !sysStopping) {
+        if (e instanceof InterruptedException && !isNodeStopping) {
             msg = "Failed to execute job due to interrupted exception.";
 
             // Turn interrupted exception into checked exception.
@@ -716,7 +702,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
 
             ex = new ComputeUserUndeclaredException(msg, e);
         }
-        else if (sysStopping && X.hasCause(e, InterruptedException.class, 
IgniteInterruptedCheckedException.class)) {
+        else if (isNodeStopping && X.hasCause(e, InterruptedException.class, 
IgniteInterruptedCheckedException.class)) {
             msg = "Job got interrupted due to system stop (will attempt 
failover).";
 
             ex = new ComputeExecutionRejectedException(e);
@@ -747,40 +733,33 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
      * @param sys System flag.
      */
     public void cancel(boolean sys) {
-        try {
-            final ComputeJob job0 = job;
-
-            if (sys)
-                sysCancelled = true;
+        if (log.isDebugEnabled())
+            log.debug("Cancelling job: " + ses);
 
-            if (job0 != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Cancelling job: " + ses);
+        boolean firstCancel = isCancelled.compareAndSet(false, true);
 
-                status = CANCELLED;
+        isCancelledBySystem = sys;
 
-                U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
-                    try (Scope ignored = ctx.security().withContext(secCtx)) {
-                        job0.cancel();
-                    }
-                });
-            }
+        status = CANCELLED;
 
-            // Interrupting only when all 'cancelled' flags are set.
-            // This allows the 'job' to determine it's a cancellation.
-            super.cancel();
+        final ComputeJob job0 = job;
 
-            if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
-                recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
+        try (Scope ignored = ctx.security().withContext(secCtx)) {
+            U.wrapThreadLoader(dep.classLoader(), job0::cancel);
         }
-        // Catch throwable to protect against bad user code.
-        catch (Throwable e) {
+        catch (Throwable e) { // Catch throwable to protect against bad user 
code.
             U.error(log, "Failed to cancel job due to undeclared user 
exception [jobId=" + ses.getJobId() +
                 ", ses=" + ses + ']', e);
 
             if (e instanceof Error)
                 throw e;
         }
+        finally {
+            onCancel(firstCancel);
+
+            if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
+                recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
+        }
     }
 
     /**
@@ -844,7 +823,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
 
         // Do not send reply if job has been cancelled from system.
         if (sndReply)
-            sndReply = !sysCancelled;
+            sndReply = !isCancelledBySystem;
 
         // We should save message ID here since listener callback will reset 
sequence.
         ClusterNode sndNode = ctx.discovery().node(taskNode.id());

Reply via email to