This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 86c7e197c07 IGNITE-28189 Fixed incorrect status of a Compute Task when
it is canceled (#12876)
86c7e197c07 is described below
commit 86c7e197c07080ebb46f8916db641de16df17eb8
Author: Mikhail Petrov <[email protected]>
AuthorDate: Mon Mar 16 13:48:08 2026 +0300
IGNITE-28189 Fixed incorrect status of a Compute Task when it is canceled
(#12876)
---
.../internal/processors/job/GridJobProcessor.java | 2 +-
.../internal/processors/job/GridJobWorker.java | 75 ++++++++--------------
2 files changed, 28 insertions(+), 49 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index fd1b0122d6d..3a5924c9684 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -485,7 +485,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
// Cancel only if we force grid to stop
if (cancel) {
for (GridJobWorker job : activeJobs.values()) {
- job.onStopping();
+ job.onNodeStopping();
cancelJob(job, false);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 90f32db551f..b59fd5b15da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
@@ -145,16 +144,16 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();
/** */
- private volatile boolean timedOut;
+ private volatile boolean isStarted;
/** */
- private volatile boolean sysCancelled;
+ private volatile boolean isCancelledBySystem;
/** */
- private volatile boolean sysStopping;
+ private volatile boolean isTimedOut;
/** */
- private volatile boolean isStarted;
+ private volatile boolean isNodeStopping;
/** Deployed job. */
private ComputeJob job;
@@ -272,15 +271,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
return dep;
}
- /**
- * Returns {@code True} if job was cancelled by the system.
- *
- * @return {@code True} if job was cancelled by the system.
- */
- boolean isSystemCanceled() {
- return sysCancelled;
- }
-
/**
* @return Create time.
*/
@@ -402,7 +392,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
* @return {@code True} if job is timed out.
*/
public boolean isTimedOut() {
- return timedOut;
+ return isTimedOut;
}
/**
@@ -417,7 +407,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
if (finishing.get())
return;
- timedOut = true;
+ isTimedOut = true;
U.warn(log, "Job has timed out: " + ses);
@@ -430,8 +420,8 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
/**
* Callback for whenever grid is stopping.
*/
- public void onStopping() {
- sysStopping = true;
+ public void onNodeStopping() {
+ isNodeStopping = true;
}
/**
@@ -561,10 +551,6 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
}
}
- if (isCancelled())
- // If job was cancelled prior to assigning runner to it?
- super.cancel();
-
if (!skipNtf) {
if (holdLsnr.onUnheld(this)) {
if (held.decrementAndGet() == 0)
@@ -618,7 +604,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
}
}
catch (IgniteException e) {
- if (sysStopping &&
e.hasCause(IgniteInterruptedCheckedException.class,
InterruptedException.class)) {
+ if (isNodeStopping &&
e.hasCause(IgniteInterruptedCheckedException.class,
InterruptedException.class)) {
ex = handleThrowable(e);
assert ex != null;
@@ -700,7 +686,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
// Special handling for weird interrupted exception which
// happens due to JDk 1.5 bug.
- if (e instanceof InterruptedException && !sysStopping) {
+ if (e instanceof InterruptedException && !isNodeStopping) {
msg = "Failed to execute job due to interrupted exception.";
// Turn interrupted exception into checked exception.
@@ -716,7 +702,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
ex = new ComputeUserUndeclaredException(msg, e);
}
- else if (sysStopping && X.hasCause(e, InterruptedException.class,
IgniteInterruptedCheckedException.class)) {
+ else if (isNodeStopping && X.hasCause(e, InterruptedException.class,
IgniteInterruptedCheckedException.class)) {
msg = "Job got interrupted due to system stop (will attempt
failover).";
ex = new ComputeExecutionRejectedException(e);
@@ -747,40 +733,33 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
* @param sys System flag.
*/
public void cancel(boolean sys) {
- try {
- final ComputeJob job0 = job;
-
- if (sys)
- sysCancelled = true;
+ if (log.isDebugEnabled())
+ log.debug("Cancelling job: " + ses);
- if (job0 != null) {
- if (log.isDebugEnabled())
- log.debug("Cancelling job: " + ses);
+ boolean firstCancel = isCancelled.compareAndSet(false, true);
- status = CANCELLED;
+ isCancelledBySystem = sys;
- U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> {
- try (Scope ignored = ctx.security().withContext(secCtx)) {
- job0.cancel();
- }
- });
- }
+ status = CANCELLED;
- // Interrupting only when all 'cancelled' flags are set.
- // This allows the 'job' to determine it's a cancellation.
- super.cancel();
+ final ComputeJob job0 = job;
- if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
- recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
+ try (Scope ignored = ctx.security().withContext(secCtx)) {
+ U.wrapThreadLoader(dep.classLoader(), job0::cancel);
}
- // Catch throwable to protect against bad user code.
- catch (Throwable e) {
+ catch (Throwable e) { // Catch throwable to protect against bad user
code.
U.error(log, "Failed to cancel job due to undeclared user
exception [jobId=" + ses.getJobId() +
", ses=" + ses + ']', e);
if (e instanceof Error)
throw e;
}
+ finally {
+ onCancel(firstCancel);
+
+ if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
+ recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
+ }
}
/**
@@ -844,7 +823,7 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
// Do not send reply if job has been cancelled from system.
if (sndReply)
- sndReply = !sysCancelled;
+ sndReply = !isCancelledBySystem;
// We should save message ID here since listener callback will reset
sequence.
ClusterNode sndNode = ctx.discovery().node(taskNode.id());