This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 139ef93389 IGNITE-22427 Fix JobExecution.cancelAsync semantics (#3937)
139ef93389 is described below

commit 139ef93389b1af910e2ce8d58e9ec9ed84eb74b9
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Jun 18 16:03:07 2024 +0300

    IGNITE-22427 Fix JobExecution.cancelAsync semantics (#3937)
---
 .../org/apache/ignite/compute/JobExecution.java    |  6 +-
 .../internal/compute/ItComputeTestEmbedded.java    | 72 +++++++++++++++-------
 2 files changed, 52 insertions(+), 26 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java 
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
index c900610ef6..c28cfb1bfc 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
@@ -56,7 +56,7 @@ public interface JobExecution<R> {
      * Cancels the job.
      *
      * @return The future which will be completed with {@code true} when the 
job is cancelled, {@code false} when the job couldn't be
-     *         cancelled (either it's not yet started, or it's already 
completed), or {@code null} if the job no longer exists due to
+     *         cancelled (if it's already completed or in the process of 
cancelling), or {@code null} if the job no longer exists due to
      *         exceeding the retention time limit.
      */
     CompletableFuture<@Nullable Boolean> cancelAsync();
@@ -66,8 +66,8 @@ public interface JobExecution<R> {
      *
      * @param newPriority new priority.
      * @return The future which will be completed with {@code true} when the 
priority is changed, {@code false} when the priority couldn't
-     *         be changed (it's already executing or completed), or {@code 
null} if the job no longer exists due to
-     *         exceeding the retention time limit.
+     *         be changed (if the job is already executing or completed), or 
{@code null} if the job no longer exists due to exceeding the
+     *         retention time limit.
      */
     CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority);
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 4aed1cf525..41269a8797 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.compute;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.QUEUED;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicException;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -52,7 +56,6 @@ import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobExecutionOptions;
-import org.apache.ignite.compute.JobState;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -85,11 +88,40 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         JobDescriptor job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
         JobExecution<String> execution = 
entryNode.compute().submit(Set.of(entryNode.node()), job, new 
CountDownLatch(1));
 
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         assertThat(execution.cancelAsync(), willBe(true));
 
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.CANCELED)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(CANCELED)));
+    }
+
+    @Test
+    void cancelsQueuedJobLocally() {
+        IgniteImpl entryNode = node(0);
+        Set<ClusterNode> nodes = Set.of(entryNode.node());
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        JobDescriptor job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
+
+        // Start 1 task in executor with 1 thread
+        JobExecution<String> execution1 = entryNode.compute().submit(nodes, 
job, countDownLatch);
+        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
+
+        // Start one more task
+        JobExecution<String> execution2 = entryNode.compute().submit(nodes, 
job, new CountDownLatch(1));
+        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
+
+        // Task 2 is not complete, in queued state
+        assertThat(execution2.resultAsync().isDone(), is(false));
+
+        // Cancel queued task
+        assertThat(execution2.cancelAsync(), willBe(true));
+        assertThat(execution2.statusAsync(), 
willBe(jobStatusWithState(CANCELED)));
+
+        // Finish running task
+        countDownLatch.countDown();
+        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(COMPLETED)));
+        assertThat(execution1.cancelAsync(), willBe(false));
     }
 
     @Test
@@ -99,11 +131,11 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         JobDescriptor job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
         JobExecution<String> execution = 
entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1));
 
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         assertThat(execution.cancelAsync(), willBe(true));
 
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.CANCELED)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(CANCELED)));
     }
 
     @Test
@@ -112,7 +144,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
         JobDescriptor job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
         JobExecution<String> execution = 
entryNode.compute().submit(Set.of(entryNode.node()), job, new 
CountDownLatch(1));
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         assertThat(execution.changePriorityAsync(2), willBe(false));
         assertThat(execution.cancelAsync(), willBe(true));
@@ -124,7 +156,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
         JobDescriptor job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
         JobExecution<String> execution = 
entryNode.compute().submit(Set.of(node(1).node()), job, new CountDownLatch(1));
-        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         assertThat(execution.changePriorityAsync(2), willBe(false));
         assertThat(execution.cancelAsync(), willBe(true));
@@ -140,17 +172,17 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
         // Start 1 task in executor with 1 thread
         JobExecution<String> execution1 = entryNode.compute().submit(nodes, 
job, countDownLatch);
-        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         // Start one more task
         JobExecution<String> execution2 = entryNode.compute().submit(nodes, 
job, new CountDownLatch(1));
-        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
 
         // Start third task
         JobExecution<String> execution3 = entryNode.compute().submit(nodes, 
job, countDownLatch);
-        await().until(execution3::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+        await().until(execution3::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
 
-        // Task 1 and 2 are not competed, in queue state
+        // Task 2 and 3 are not completed, in queued state
         assertThat(execution2.resultAsync().isDone(), is(false));
         assertThat(execution3.resultAsync().isDone(), is(false));
 
@@ -181,11 +213,11 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
         // Start 1 task in executor with 1 thread
         JobExecution<String> execution1 = entryNode.compute().submit(nodes, 
job, new Object[]{countDownLatch});
-        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
 
         // Start one more task
         JobExecution<String> execution2 = entryNode.compute().submit(nodes, 
job, new Object[]{new CountDownLatch(1)});
-        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
 
         // Start third task it should be before task2 in the queue due to 
higher priority in options
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(1).maxRetries(2).build();
@@ -196,7 +228,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
                     .options(options)
                     .build(),
                 countDownLatch);
-        await().until(execution3::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+        await().until(execution3::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
 
         // Task 1 and 2 are not competed, in queue state
         assertThat(execution2.resultAsync().isDone(), is(false));
@@ -289,9 +321,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     @Test
     void executesNullReturningJobViaSyncBroadcast() {
-        int entryNodeIndex = 0;
-
-        IgniteImpl entryNode = node(entryNodeIndex);
+        IgniteImpl entryNode = node(0);
 
         Map<ClusterNode, Object> results = entryNode.compute()
                 .executeBroadcast(new HashSet<>(entryNode.clusterNodes()), 
JobDescriptor.builder(NullReturningJob.class).build());
@@ -302,9 +332,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     @Test
     void executesNullReturningJobViaAsyncBroadcast() {
-        int entryNodeIndex = 0;
-
-        IgniteImpl entryNode = node(entryNodeIndex);
+        IgniteImpl entryNode = node(0);
 
         CompletableFuture<Map<ClusterNode, Object>> resultsFuture = 
entryNode.compute()
                 .executeBroadcastAsync(new 
HashSet<>(entryNode.clusterNodes()), 
JobDescriptor.builder(NullReturningJob.class).build());
@@ -317,9 +345,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     @Test
     void executesNullReturningJobViaSubmitBroadcast() {
-        int entryNodeIndex = 0;
-
-        IgniteImpl entryNode = node(entryNodeIndex);
+        IgniteImpl entryNode = node(0);
 
         Map<ClusterNode, JobExecution<Object>> executionsMap = 
entryNode.compute().submitBroadcast(
                 new HashSet<>(entryNode.clusterNodes()),

Reply via email to