This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ee728e1dc IGNITE-23689 Add tests for standalone compute (#4731)
5ee728e1dc is described below

commit 5ee728e1dc381b12e4521a0a1cac71f2118f895e
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Nov 18 15:58:05 2024 +0300

    IGNITE-23689 Add tests for standalone compute (#4731)
---
 .../ignite/internal/compute/ItComputeBaseTest.java | 83 ++++++++++++++++++++++
 .../internal/compute/ItComputeTestEmbedded.java    | 83 ----------------------
 2 files changed, 83 insertions(+), 83 deletions(-)

diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 995f436ee1..222b41c347 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -18,14 +18,18 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobStatus.CANCELED;
 import static org.apache.ignite.compute.JobStatus.COMPLETED;
+import static org.apache.ignite.compute.JobStatus.EXECUTING;
 import static org.apache.ignite.compute.JobStatus.FAILED;
+import static org.apache.ignite.compute.JobStatus.QUEUED;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.contains;
@@ -465,6 +469,85 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         assertThat(result, is(sumOfNodeNamesLengths));
     }
 
+    @Test
+    void cancelsJobLocally() {
+        Ignite entryNode = node(0);
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, 
Long.MAX_VALUE);
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        assertThat(execution.cancelAsync(), willBe(true));
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
+    @Test
+    void cancelsQueuedJobLocally() {
+        Ignite entryNode = node(0);
+        var nodes = JobTarget.node(clusterNode(entryNode));
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
+
+        // Start 1 task in executor with 1 thread
+        JobExecution<Void> execution1 = entryNode.compute().submit(nodes, job, 
Long.MAX_VALUE);
+        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        // Start one more task
+        JobExecution<Void> execution2 = entryNode.compute().submit(nodes, job, 
Long.MAX_VALUE);
+        await().until(execution2::stateAsync, 
willBe(jobStateWithStatus(QUEUED)));
+
+        // Task 2 is not complete, in queued state
+        assertThat(execution2.resultAsync().isDone(), is(false));
+
+        // Cancel queued task
+        assertThat(execution2.cancelAsync(), willBe(true));
+        await().until(execution2::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+
+        // Cancel running task
+        assertThat(execution1.cancelAsync(), willBe(true));
+        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
+    @Test
+    void cancelsJobRemotely() {
+        Ignite entryNode = node(0);
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, 
Long.MAX_VALUE);
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        assertThat(execution.cancelAsync(), willBe(true));
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
+    @Test
+    void changeExecutingJobPriorityLocally() {
+        Ignite entryNode = node(0);
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, 
Long.MAX_VALUE);
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        assertThat(execution.changePriorityAsync(2), willBe(false));
+        assertThat(execution.cancelAsync(), willBe(true));
+    }
+
+    @Test
+    void changeExecutingJobPriorityRemotely() {
+        Ignite entryNode = node(0);
+
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, 
Long.MAX_VALUE);
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        assertThat(execution.changePriorityAsync(2), willBe(false));
+        assertThat(execution.cancelAsync(), willBe(true));
+    }
+
     static Ignite node(int i) {
         return CLUSTER.node(i);
     }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 81f5636974..f41fdea57a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.compute;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
-import static org.apache.ignite.compute.JobStatus.COMPLETED;
 import static org.apache.ignite.compute.JobStatus.EXECUTING;
 import static org.apache.ignite.compute.JobStatus.QUEUED;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
@@ -81,87 +79,6 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         return List.of();
     }
 
-    @Test
-    void cancelsJobLocally() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<CountDownLatch, String> job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-        JobExecution<String> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, new 
CountDownLatch(1));
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.cancelAsync(), willBe(true));
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
-    }
-
-    @Test
-    void cancelsQueuedJobLocally() {
-        Ignite entryNode = node(0);
-        var nodes = JobTarget.node(clusterNode(entryNode));
-
-        CountDownLatch countDownLatch = new CountDownLatch(1);
-        JobDescriptor<CountDownLatch, String> job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-
-        // Start 1 task in executor with 1 thread
-        JobExecution<String> execution1 = entryNode.compute().submit(nodes, 
job, countDownLatch);
-        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        // Start one more task
-        JobExecution<String> execution2 = entryNode.compute().submit(nodes, 
job, new CountDownLatch(1));
-        await().until(execution2::stateAsync, 
willBe(jobStateWithStatus(QUEUED)));
-
-        // Task 2 is not complete, in queued state
-        assertThat(execution2.resultAsync().isDone(), is(false));
-
-        // Cancel queued task
-        assertThat(execution2.cancelAsync(), willBe(true));
-        assertThat(execution2.stateAsync(), 
willBe(jobStateWithStatus(CANCELED)));
-
-        // Finish running task
-        countDownLatch.countDown();
-        await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(COMPLETED)));
-        assertThat(execution1.cancelAsync(), willBe(false));
-    }
-
-    @Test
-    void cancelsJobRemotely() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<CountDownLatch, String> job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-        JobExecution<String> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, new 
CountDownLatch(1));
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.cancelAsync(), willBe(true));
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
-    }
-
-    @Test
-    void changeExecutingJobPriorityLocally() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<CountDownLatch, String> job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-        JobExecution<String> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, new 
CountDownLatch(1));
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.changePriorityAsync(2), willBe(false));
-        assertThat(execution.cancelAsync(), willBe(true));
-    }
-
-    @Test
-    void changeExecutingJobPriorityRemotely() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<CountDownLatch, String> job = 
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-        JobExecution<String> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, new 
CountDownLatch(1));
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.changePriorityAsync(2), willBe(false));
-        assertThat(execution.cancelAsync(), willBe(true));
-    }
-
     @Test
     void changeJobPriorityLocally() {
         Ignite entryNode = node(0);

Reply via email to