This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2e33dec0d1 IGNITE-22974 Fix
ItThinClientComputeTest.testCancelOnSpecificNodeAsync (#4216)
2e33dec0d1 is described below
commit 2e33dec0d1029d7cadc610f543919df22c72849c
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Aug 13 20:24:08 2024 +0300
IGNITE-22974 Fix ItThinClientComputeTest.testCancelOnSpecificNodeAsync
(#4216)
---
.../app/client/ItAbstractThinClientTest.java | 4 +--
.../runner/app/client/ItThinClientComputeTest.java | 31 +++++++++++++---------
2 files changed, 21 insertions(+), 14 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index afcd0bd403..e5c3ee4c46 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -79,7 +79,7 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
node0Name,
"{\n"
+ " network.port: 3344,\n"
- + " network.nodeFinder.netClusterNodes: [
\"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + " network.nodeFinder.netClusterNodes: [
\"localhost:3344\", \"localhost:3345\" ]\n"
+ " clientConnector.port: 10800,\n"
+ " rest.port: 10300\n"
+ " compute.threadPoolSize: 1\n"
@@ -90,7 +90,7 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
node1Name,
"{\n"
+ " network.port: 3345,\n"
- + " network.nodeFinder.netClusterNodes: [
\"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ + " network.nodeFinder.netClusterNodes: [
\"localhost:3344\", \"localhost:3345\" ]\n"
+ "
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ " clientConnector.metricsEnabled: true\n"
+ " clientConnector.port: 10801,\n"
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index d152c7aeaa..f45f447015 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -182,12 +182,12 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
@ValueSource(booleans = {true, false})
void testCancelOnSpecificNodeAsync(boolean asyncJob) {
int sleepMs = 1_000_000;
- JobDescriptor sleepJob = JobDescriptor
+ JobDescriptor<Integer, Void> sleepJob = JobDescriptor
.builder(asyncJob ? AsyncSleepJob.class : SleepJob.class)
.build();
- JobExecution<String> execution1 =
client().compute().submit(JobTarget.node(node(0)), sleepJob, sleepMs);
- JobExecution<String> execution2 =
client().compute().submit(JobTarget.node(node(1)), sleepJob, sleepMs);
+ JobExecution<Void> execution1 =
client().compute().submit(JobTarget.node(node(0)), sleepJob, sleepMs);
+ JobExecution<Void> execution2 =
client().compute().submit(JobTarget.node(node(1)), sleepJob, sleepMs);
await().until(execution1::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
await().until(execution2::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
@@ -202,19 +202,19 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
@Test
void changeJobPriority() {
int sleepMs = 1_000_000;
- JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build();
+ JobDescriptor<Integer, Void> sleepJob =
JobDescriptor.builder(SleepJob.class).build();
JobTarget target = JobTarget.node(node(0));
// Start 1 task in executor with 1 thread
- JobExecution<String> execution1 = client().compute().submit(target,
sleepJob, sleepMs);
+ JobExecution<Void> execution1 = client().compute().submit(target,
sleepJob, sleepMs);
await().until(execution1::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
// Start one more long lasting task
- JobExecution<String> execution2 = client().compute().submit(target,
sleepJob, sleepMs);
+ JobExecution<Void> execution2 = client().compute().submit(target,
sleepJob, sleepMs);
await().until(execution2::stateAsync,
willBe(jobStateWithStatus(QUEUED)));
// Start third task
- JobExecution<String> execution3 = client().compute().submit(target,
sleepJob, sleepMs);
+ JobExecution<Void> execution3 = client().compute().submit(target,
sleepJob, sleepMs);
await().until(execution3::stateAsync,
willBe(jobStateWithStatus(QUEUED)));
// Task 2 and 3 are not completed, in queue state
@@ -801,7 +801,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
return completedFuture(
Arrays.stream(args.split(":"))
- .map(o -> o == null ? "null" : o.toString())
+ .map(o -> o == null ? "null" : o)
.collect(Collectors.joining("_")));
}
}
@@ -846,9 +846,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class SleepJob implements ComputeJob<Integer, Void> {
@Override
- public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Integer args) {
+ public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Integer sleepMs) {
try {
- Thread.sleep(args);
+ Thread.sleep(sleepMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -859,10 +859,17 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class AsyncSleepJob implements ComputeJob<Integer, Void> {
@Override
- public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Integer args) {
+ public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Integer sleepMs) {
return CompletableFuture.runAsync(() -> {
try {
- Thread.sleep(args);
+ int limit = sleepMs;
+ while (limit > 0) {
+ if (context.isCancelled()) {
+ return;
+ }
+ Thread.sleep(100);
+ limit -= 100;
+ }
} catch (InterruptedException e) {
throw new RuntimeException(e);
}