This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 139ef93389 IGNITE-22427 Fix JobExecution.cancelAsync semantics (#3937)
139ef93389 is described below
commit 139ef93389b1af910e2ce8d58e9ec9ed84eb74b9
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Jun 18 16:03:07 2024 +0300
IGNITE-22427 Fix JobExecution.cancelAsync semantics (#3937)
---
.../org/apache/ignite/compute/JobExecution.java | 6 +-
.../internal/compute/ItComputeTestEmbedded.java | 72 +++++++++++++++-------
2 files changed, 52 insertions(+), 26 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
index c900610ef6..c28cfb1bfc 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
@@ -56,7 +56,7 @@ public interface JobExecution<R> {
* Cancels the job.
*
* @return The future which will be completed with {@code true} when the
job is cancelled, {@code false} when the job couldn't be
- * cancelled (either it's not yet started, or it's already
completed), or {@code null} if the job no longer exists due to
+ * cancelled (if it's already completed or in the process of
cancelling), or {@code null} if the job no longer exists due to
* exceeding the retention time limit.
*/
CompletableFuture<@Nullable Boolean> cancelAsync();
@@ -66,8 +66,8 @@ public interface JobExecution<R> {
*
* @param newPriority new priority.
* @return The future which will be completed with {@code true} when the
priority is changed, {@code false} when the priority couldn't
- * be changed (it's already executing or completed), or {@code
null} if the job no longer exists due to
- * exceeding the retention time limit.
+ * be changed (if the job is already executing or completed), or
{@code null} if the job no longer exists due to exceeding the
+ * retention time limit.
*/
CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority);
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 4aed1cf525..41269a8797 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.QUEUED;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicException;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -52,7 +56,6 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobExecutionOptions;
-import org.apache.ignite.compute.JobState;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -85,11 +88,40 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
JobDescriptor job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
JobExecution<String> execution =
entryNode.compute().submit(Set.of(entryNode.node()), job, new
CountDownLatch(1));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
assertThat(execution.cancelAsync(), willBe(true));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.CANCELED)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(CANCELED)));
+ }
+
+ @Test
+ void cancelsQueuedJobLocally() {
+ IgniteImpl entryNode = node(0);
+ Set<ClusterNode> nodes = Set.of(entryNode.node());
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ JobDescriptor job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
+
+ // Start 1 task in executor with 1 thread
+ JobExecution<String> execution1 = entryNode.compute().submit(nodes,
job, countDownLatch);
+ await().until(execution1::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
+
+ // Start one more task
+ JobExecution<String> execution2 = entryNode.compute().submit(nodes,
job, new CountDownLatch(1));
+ await().until(execution2::statusAsync,
willBe(jobStatusWithState(QUEUED)));
+
+ // Task 2 is not complete, in queued state
+ assertThat(execution2.resultAsync().isDone(), is(false));
+
+ // Cancel queued task
+ assertThat(execution2.cancelAsync(), willBe(true));
+ assertThat(execution2.statusAsync(),
willBe(jobStatusWithState(CANCELED)));
+
+ // Finish running task
+ countDownLatch.countDown();
+ await().until(execution1::statusAsync,
willBe(jobStatusWithState(COMPLETED)));
+ assertThat(execution1.cancelAsync(), willBe(false));
}
@Test
@@ -99,11 +131,11 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
JobDescriptor job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
JobExecution<String> execution =
entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
assertThat(execution.cancelAsync(), willBe(true));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.CANCELED)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(CANCELED)));
}
@Test
@@ -112,7 +144,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
JobDescriptor job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
JobExecution<String> execution =
entryNode.compute().submit(Set.of(entryNode.node()), job, new
CountDownLatch(1));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
assertThat(execution.changePriorityAsync(2), willBe(false));
assertThat(execution.cancelAsync(), willBe(true));
@@ -124,7 +156,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
JobDescriptor job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
JobExecution<String> execution =
entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1));
- await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
assertThat(execution.changePriorityAsync(2), willBe(false));
assertThat(execution.cancelAsync(), willBe(true));
@@ -140,17 +172,17 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
// Start 1 task in executor with 1 thread
JobExecution<String> execution1 = entryNode.compute().submit(nodes,
job, countDownLatch);
- await().until(execution1::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution1::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
// Start one more task
JobExecution<String> execution2 = entryNode.compute().submit(nodes,
job, new CountDownLatch(1));
- await().until(execution2::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+ await().until(execution2::statusAsync,
willBe(jobStatusWithState(QUEUED)));
// Start third task
JobExecution<String> execution3 = entryNode.compute().submit(nodes,
job, countDownLatch);
- await().until(execution3::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+ await().until(execution3::statusAsync,
willBe(jobStatusWithState(QUEUED)));
- // Task 1 and 2 are not competed, in queue state
+ // Task 2 and 3 are not completed, in queued state
assertThat(execution2.resultAsync().isDone(), is(false));
assertThat(execution3.resultAsync().isDone(), is(false));
@@ -181,11 +213,11 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
// Start 1 task in executor with 1 thread
JobExecution<String> execution1 = entryNode.compute().submit(nodes,
job, new Object[]{countDownLatch});
- await().until(execution1::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+ await().until(execution1::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
// Start one more task
JobExecution<String> execution2 = entryNode.compute().submit(nodes,
job, new Object[]{new CountDownLatch(1)});
- await().until(execution2::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+ await().until(execution2::statusAsync,
willBe(jobStatusWithState(QUEUED)));
// Start third task it should be before task2 in the queue due to
higher priority in options
JobExecutionOptions options =
JobExecutionOptions.builder().priority(1).maxRetries(2).build();
@@ -196,7 +228,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
.options(options)
.build(),
countDownLatch);
- await().until(execution3::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+ await().until(execution3::statusAsync,
willBe(jobStatusWithState(QUEUED)));
// Task 1 and 2 are not competed, in queue state
assertThat(execution2.resultAsync().isDone(), is(false));
@@ -289,9 +321,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
@Test
void executesNullReturningJobViaSyncBroadcast() {
- int entryNodeIndex = 0;
-
- IgniteImpl entryNode = node(entryNodeIndex);
+ IgniteImpl entryNode = node(0);
Map<ClusterNode, Object> results = entryNode.compute()
.executeBroadcast(new HashSet<>(entryNode.clusterNodes()),
JobDescriptor.builder(NullReturningJob.class).build());
@@ -302,9 +332,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
@Test
void executesNullReturningJobViaAsyncBroadcast() {
- int entryNodeIndex = 0;
-
- IgniteImpl entryNode = node(entryNodeIndex);
+ IgniteImpl entryNode = node(0);
CompletableFuture<Map<ClusterNode, Object>> resultsFuture =
entryNode.compute()
.executeBroadcastAsync(new
HashSet<>(entryNode.clusterNodes()),
JobDescriptor.builder(NullReturningJob.class).build());
@@ -317,9 +345,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
@Test
void executesNullReturningJobViaSubmitBroadcast() {
- int entryNodeIndex = 0;
-
- IgniteImpl entryNode = node(entryNodeIndex);
+ IgniteImpl entryNode = node(0);
Map<ClusterNode, JobExecution<Object>> executionsMap =
entryNode.compute().submitBroadcast(
new HashSet<>(entryNode.clusterNodes()),