This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5ee728e1dc IGNITE-23689 Add tests for standalone compute (#4731)
5ee728e1dc is described below
commit 5ee728e1dc381b12e4521a0a1cac71f2118f895e
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Nov 18 15:58:05 2024 +0300
IGNITE-23689 Add tests for standalone compute (#4731)
---
.../ignite/internal/compute/ItComputeBaseTest.java | 83 ++++++++++++++++++++++
.../internal/compute/ItComputeTestEmbedded.java | 83 ----------------------
2 files changed, 83 insertions(+), 83 deletions(-)
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 995f436ee1..222b41c347 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -18,14 +18,18 @@
package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.COMPLETED;
+import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.compute.JobStatus.FAILED;
+import static org.apache.ignite.compute.JobStatus.QUEUED;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
@@ -465,6 +469,85 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
assertThat(result, is(sumOfNodeNamesLengths));
}
+ @Test
+ void cancelsJobLocally() {
+ Ignite entryNode = node(0);
+
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job,
Long.MAX_VALUE);
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ assertThat(execution.cancelAsync(), willBe(true));
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
+ @Test
+ void cancelsQueuedJobLocally() {
+ Ignite entryNode = node(0);
+ var nodes = JobTarget.node(clusterNode(entryNode));
+
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
+
+ // Start 1 task in executor with 1 thread
+ JobExecution<Void> execution1 = entryNode.compute().submit(nodes, job,
Long.MAX_VALUE);
+ await().until(execution1::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ // Start one more task
+ JobExecution<Void> execution2 = entryNode.compute().submit(nodes, job,
Long.MAX_VALUE);
+ await().until(execution2::stateAsync,
willBe(jobStateWithStatus(QUEUED)));
+
+ // Task 2 is not complete, in queued state
+ assertThat(execution2.resultAsync().isDone(), is(false));
+
+ // Cancel queued task
+ assertThat(execution2.cancelAsync(), willBe(true));
+ await().until(execution2::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+
+ // Cancel running task
+ assertThat(execution1.cancelAsync(), willBe(true));
+ await().until(execution1::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
+ @Test
+ void cancelsJobRemotely() {
+ Ignite entryNode = node(0);
+
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job,
Long.MAX_VALUE);
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ assertThat(execution.cancelAsync(), willBe(true));
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
+ @Test
+ void changeExecutingJobPriorityLocally() {
+ Ignite entryNode = node(0);
+
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job,
Long.MAX_VALUE);
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ assertThat(execution.changePriorityAsync(2), willBe(false));
+ assertThat(execution.cancelAsync(), willBe(true));
+ }
+
+ @Test
+ void changeExecutingJobPriorityRemotely() {
+ Ignite entryNode = node(0);
+
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job,
Long.MAX_VALUE);
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ assertThat(execution.changePriorityAsync(2), willBe(false));
+ assertThat(execution.cancelAsync(), willBe(true));
+ }
+
static Ignite node(int i) {
return CLUSTER.node(i);
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 81f5636974..f41fdea57a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
-import static org.apache.ignite.compute.JobStatus.COMPLETED;
import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.compute.JobStatus.QUEUED;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
@@ -81,87 +79,6 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
return List.of();
}
- @Test
- void cancelsJobLocally() {
- Ignite entryNode = node(0);
-
- JobDescriptor<CountDownLatch, String> job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
- JobExecution<String> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, new
CountDownLatch(1));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.cancelAsync(), willBe(true));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
- }
-
- @Test
- void cancelsQueuedJobLocally() {
- Ignite entryNode = node(0);
- var nodes = JobTarget.node(clusterNode(entryNode));
-
- CountDownLatch countDownLatch = new CountDownLatch(1);
- JobDescriptor<CountDownLatch, String> job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
-
- // Start 1 task in executor with 1 thread
- JobExecution<String> execution1 = entryNode.compute().submit(nodes,
job, countDownLatch);
- await().until(execution1::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- // Start one more task
- JobExecution<String> execution2 = entryNode.compute().submit(nodes,
job, new CountDownLatch(1));
- await().until(execution2::stateAsync,
willBe(jobStateWithStatus(QUEUED)));
-
- // Task 2 is not complete, in queued state
- assertThat(execution2.resultAsync().isDone(), is(false));
-
- // Cancel queued task
- assertThat(execution2.cancelAsync(), willBe(true));
- assertThat(execution2.stateAsync(),
willBe(jobStateWithStatus(CANCELED)));
-
- // Finish running task
- countDownLatch.countDown();
- await().until(execution1::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
- assertThat(execution1.cancelAsync(), willBe(false));
- }
-
- @Test
- void cancelsJobRemotely() {
- Ignite entryNode = node(0);
-
- JobDescriptor<CountDownLatch, String> job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
- JobExecution<String> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, new
CountDownLatch(1));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.cancelAsync(), willBe(true));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
- }
-
- @Test
- void changeExecutingJobPriorityLocally() {
- Ignite entryNode = node(0);
-
- JobDescriptor<CountDownLatch, String> job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
- JobExecution<String> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, new
CountDownLatch(1));
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.changePriorityAsync(2), willBe(false));
- assertThat(execution.cancelAsync(), willBe(true));
- }
-
- @Test
- void changeExecutingJobPriorityRemotely() {
- Ignite entryNode = node(0);
-
- JobDescriptor<CountDownLatch, String> job =
JobDescriptor.builder(WaitLatchJob.class).units(units()).build();
- JobExecution<String> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, new
CountDownLatch(1));
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.changePriorityAsync(2), willBe(false));
- assertThat(execution.cancelAsync(), willBe(true));
- }
-
@Test
void changeJobPriorityLocally() {
Ignite entryNode = node(0);