This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6c9beb025a3 IGNITE-27857 Set job status to COMPLETED when job ran to
completion (#7744)
6c9beb025a3 is described below
commit 6c9beb025a3a7d3a902f476d0fba08373928fa35
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Mar 16 19:49:09 2026 +0300
IGNITE-27857 Set job status to COMPLETED when job ran to completion (#7744)
---
.../ignite/internal/compute/ItComputeBaseTest.java | 27 +++++++--------
.../compute/events/ItComputeEventsTest.java | 32 +++++++++++++++++-
.../example/jobs/embedded/CancelAwareSleepJob.java | 37 +++++++++++++++++++++
.../jobs/standalone/CancelAwareSleepJob.java | 37 +++++++++++++++++++++
.../internal/compute/queue/QueueExecutionImpl.java | 14 +++-----
.../internal/compute/ComputeComponentImplTest.java | 3 +-
.../compute/executor/ComputeExecutorTest.java | 38 ++++++++++++++++++++--
.../compute/queue/PriorityQueueExecutorTest.java | 6 ++--
.../rest/compute/ItComputeControllerTest.java | 3 +-
.../runner/app/client/ItThinClientComputeTest.java | 5 +--
.../systemviews/ItComputeSystemViewTest.java | 6 ++--
11 files changed, 169 insertions(+), 39 deletions(-)
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index afe480465ce..68aee720172 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -512,7 +512,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
CancelHandle cancelHandle = CancelHandle.create();
CompletableFuture<Void> execution = compute()
- .executeAsync(JobTarget.node(clusterNode(executeNode)),
silentSleepJob(), 100L, cancelHandle.token());
+ .executeAsync(JobTarget.node(clusterNode(executeNode)),
cancelAwareSleepJob(), 100L, cancelHandle.token());
cancelHandle.cancel();
@@ -527,7 +527,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
CancelHandle cancelHandle = CancelHandle.create();
CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() ->
compute()
- .execute(JobTarget.node(clusterNode(executeNode)),
silentSleepJob(), 100L, cancelHandle.token()));
+ .execute(JobTarget.node(clusterNode(executeNode)),
cancelAwareSleepJob(), 100L, cancelHandle.token()));
cancelHandle.cancel();
@@ -545,7 +545,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
CompletableFuture<Collection<Void>> resultsFut =
compute().executeAsync(
BroadcastJobTarget.nodes(executeNodes),
- silentSleepJob(), 100L, cancelHandle.token()
+ cancelAwareSleepJob(), 100L, cancelHandle.token()
);
cancelHandle.cancel();
@@ -564,7 +564,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
CompletableFuture<Collection<Void>> runFut =
IgniteTestUtils.runAsync(() -> compute().execute(
BroadcastJobTarget.nodes(executeNodes),
- silentSleepJob(), 100L, cancelHandle.token()
+ cancelAwareSleepJob(), 100L, cancelHandle.token()
));
cancelHandle.cancel();
@@ -699,19 +699,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
- CompletionException completionException =
assertThrows(CompletionException.class, () -> execution.resultAsync().join());
-
- // Unwrap CompletionException, ComputeException should be the cause
thrown from the API
- assertThat(completionException.getCause(),
instanceOf(ComputeException.class));
- ComputeException computeException = (ComputeException)
completionException.getCause();
+ // Job ignores cancellation and completes normally — cooperative
cancellation means the result is honored.
+ assertThat(execution.resultAsync(), willBe(nullValue()));
- // ComputeException should be caused by the CancellationException
thrown from the executor which detects that the job completes,
- // but was previously cancelled
- assertThat(computeException.getCause(),
instanceOf(CancellationException.class));
- CancellationException cancellationException = (CancellationException)
computeException.getCause();
- assertThat(cancellationException.getCause(), is(nullValue()));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
}
@ParameterizedTest
@@ -941,6 +932,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return JobDescriptor.<Long,
Void>builder(jobClassName("SilentSleepJob")).units(units()).build();
}
+ private JobDescriptor<Long, Void> cancelAwareSleepJob() {
+ return JobDescriptor.<Long,
Void>builder(jobClassName("CancelAwareSleepJob")).units(units()).build();
+ }
+
private JobDescriptor<Void, Long> getPartitionJob() {
return JobDescriptor.<Void,
Long>builder(jobClassName("GetPartitionJob")).units(units()).build();
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
index 9e7e163792c..fb837feb558 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
@@ -82,6 +82,7 @@ import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.example.jobs.embedded.CancelAwareSleepJob;
import org.example.jobs.embedded.FailingJob;
import org.example.jobs.embedded.FailingJobMapReduceTask;
import org.example.jobs.embedded.FailingReduceMapReduceTask;
@@ -247,6 +248,35 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
cancelHandle.cancel();
+ // SilentSleepJob catches interruption and completes normally —
cooperative cancellation honors the result.
+ assertThat(execution.resultAsync(), willBe(nullValue()));
+
+ UUID jobId = execution.idAsync().join(); // Safe to join since
execution is complete.
+ String jobClassName = jobDescriptor.jobClassName();
+ String targetNode = node(targetNodeIndex).name();
+
+ assertEvents(
+ jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName, targetNode),
+ jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
targetNode),
+ jobEvent(COMPUTE_JOB_CANCELING, jobId, jobClassName,
targetNode),
+ jobEvent(COMPUTE_JOB_COMPLETED, jobId, jobClassName,
targetNode)
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1})
+ void cancelAwareJob(int targetNodeIndex) {
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ JobDescriptor<Long, Void> jobDescriptor =
JobDescriptor.builder(CancelAwareSleepJob.class).build();
+ JobTarget target = JobTarget.node(clusterNode(targetNodeIndex));
+ JobExecution<Void> execution = submit(target, jobDescriptor,
cancelHandle.token(), Long.MAX_VALUE);
+
+ // Wait for start executing
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ cancelHandle.cancel();
+
assertThat(execution.resultAsync(), willThrow(ComputeException.class));
UUID jobId = execution.idAsync().join(); // Safe to join since
execution is complete.
@@ -267,7 +297,7 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
// Start first job
CancelHandle cancelHandle1 = CancelHandle.create();
- JobDescriptor<Long, Void> jobDescriptor =
JobDescriptor.builder(SilentSleepJob.class).build();
+ JobDescriptor<Long, Void> jobDescriptor =
JobDescriptor.builder(CancelAwareSleepJob.class).build();
JobTarget target = JobTarget.node(clusterNode(targetNodeIndex));
JobExecution<Void> execution1 = submit(target, jobDescriptor,
cancelHandle1.token(), Long.MAX_VALUE);
// Wait for it to start executing
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/CancelAwareSleepJob.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/CancelAwareSleepJob.java
new file mode 100644
index 00000000000..cb16fb85879
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/CancelAwareSleepJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** Compute job that sleeps for a number of seconds and throws {@link
CancellationException} if interrupted. */
+public class CancelAwareSleepJob implements ComputeJob<Long, Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
jobExecutionContext, Long timeout) {
+ try {
+ TimeUnit.SECONDS.sleep(timeout);
+ } catch (InterruptedException e) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/CancelAwareSleepJob.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/CancelAwareSleepJob.java
new file mode 100644
index 00000000000..0283507a6b4
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/CancelAwareSleepJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/** Compute job that sleeps for a number of seconds and throws {@link
CancellationException} if interrupted. */
+public class CancelAwareSleepJob implements ComputeJob<Long, Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
jobExecutionContext, Long timeout) {
+ try {
+ TimeUnit.SECONDS.sleep(timeout);
+ } catch (InterruptedException e) {
+ throw new CancellationException();
+ }
+ return null;
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 1e072a8f60c..d914f62b10f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -222,15 +222,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
result.completeExceptionally(throwable);
}
} else {
- if (queueEntry.isInterrupted()) {
- stateMachine.cancelJob(jobId);
- logJobCanceledEvent(eventLog, eventMetadata);
- result.completeExceptionally(new CancellationException());
- } else {
- stateMachine.completeJob(jobId);
- logJobCompletedEvent(eventLog, eventMetadata);
- result.complete(r);
- }
+ // Job completed normally. Even if cancellation was requested,
the job chose to ignore it
+ // (cooperative cancellation). Honor the result.
+ stateMachine.completeJob(jobId);
+ logJobCompletedEvent(eventLog, eventMetadata);
+ result.complete(r);
}
});
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 1db8c5dd93a..27d6f640250 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -249,7 +249,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ // LongJob catches interruption and completes normally — cooperative
cancellation honors the result.
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
assertThatNoRequestsWereSent();
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index cf735092688..ab113c518c1 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
@@ -105,9 +106,10 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
);
JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
assertThat(execution.cancel(), is(true));
+ // InterruptingJob catches interruption and completes normally —
cooperative cancellation honors the result.
await().until(
execution::state,
- jobStateWithStatusAndCreateTimeStartTime(CANCELED,
executingState.createTime(), executingState.startTime())
+ jobStateWithStatusAndCreateTimeStartTime(COMPLETED,
executingState.createTime(), executingState.startTime())
);
}
@@ -136,9 +138,10 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
);
JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
assertThat(execution.cancel(), is(true));
+ // CancellingJob checks isCancelled() and completes normally —
cooperative cancellation honors the result.
await().until(
execution::state,
- jobStateWithStatusAndCreateTimeStartTime(CANCELED,
executingState.createTime(), executingState.startTime())
+ jobStateWithStatusAndCreateTimeStartTime(COMPLETED,
executingState.createTime(), executingState.startTime())
);
}
@@ -158,6 +161,37 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
}
}
+ @Test
+ void cancelAwareCancellation() {
+ JobExecutionInternal<?> execution = computeExecutor.executeJob(
+ ExecutionOptions.DEFAULT,
+ CancelAwareJob.class.getName(),
+ jobClassLoader,
+ ComputeEventMetadata.builder(),
+ null
+ );
+ JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
+ assertThat(execution.cancel(), is(true));
+ // CancelAwareJob catches interruption and throws
CancellationException — job is canceled.
+ await().until(
+ execution::state,
+ jobStateWithStatusAndCreateTimeStartTime(CANCELED,
executingState.createTime(), executingState.startTime())
+ );
+ }
+
+ private static class CancelAwareJob implements ComputeJob<Object[],
Integer> {
+ @Override
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ while (true) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new CancellationException();
+ }
+ }
+ }
+ }
+
@Test
void retryJobFail() {
int maxRetries = 5;
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 095808a6f17..d1837da09bb 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -234,7 +234,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
try {
latch.await();
} catch (InterruptedException e) {
- return completedFuture(0);
+ throw new CancellationException();
}
}
});
@@ -257,8 +257,8 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
try {
new CountDownLatch(1).await();
- } catch (InterruptedException ignored) {
- // ignored
+ } catch (InterruptedException e) {
+ throw new CancellationException();
}
return null;
});
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
index 40f0c5bd155..138c7b86e22 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
@@ -352,7 +353,7 @@ public class ItComputeControllerTest extends
ClusterPerClassIntegrationTest {
try {
LOCK.wait();
} catch (InterruptedException e) {
- // No-op.
+ throw new CancellationException();
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 309b9962282..55d95d92b87 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -68,6 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
@@ -166,7 +167,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
CancelHandle cancelHandle = CancelHandle.create();
JobDescriptor<Object, Void> job =
JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
- CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() ->
client().compute()
+ CompletableFuture<Void> runFut = IgniteTestUtils.runAsync(() ->
client().compute()
.execute(JobTarget.node(executeNode), job, null,
cancelHandle.token()));
cancelHandle.cancel();
@@ -1118,7 +1119,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
try {
new CountDownLatch(1).await();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ throw new CancellationException();
}
return null;
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItComputeSystemViewTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItComputeSystemViewTest.java
index 575564aac73..32c2b2fff30 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItComputeSystemViewTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/systemviews/ItComputeSystemViewTest.java
@@ -41,6 +41,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
@@ -277,12 +278,9 @@ public class ItComputeSystemViewTest extends
AbstractSystemViewTest {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
- // No op, just return from loop
- break;
+ throw new CancellationException();
}
}
-
- return null;
}
}