This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a479e147c IGNITE-21480 Check local node before broadcasting (#3568)
7a479e147c is described below

commit 7a479e147cc7a0efc8e7238d1eeaa4834c6376f3
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Apr 9 15:51:20 2024 +0300

    IGNITE-21480 Check local node before broadcasting (#3568)
---
 .../internal/compute/ComputeComponentImpl.java     |  22 +-
 .../compute/messaging/ComputeMessaging.java        |   2 +
 .../internal/compute/ComputeComponentImplTest.java | 288 ++++++---------------
 3 files changed, 106 insertions(+), 206 deletions(-)

diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 39f9079253..ac0f61eab6 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -196,17 +197,32 @@ public class ComputeComponentImpl implements 
ComputeComponent {
 
     @Override
     public CompletableFuture<@Nullable JobStatus> statusAsync(UUID jobId) {
-        return messaging.broadcastStatusAsync(jobId);
+        return executionManager.statusAsync(jobId).thenCompose(jobStatus -> {
+            if (jobStatus != null) {
+                return completedFuture(jobStatus);
+            }
+            return messaging.broadcastStatusAsync(jobId);
+        });
     }
 
     @Override
     public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
-        return messaging.broadcastCancelAsync(jobId);
+        return executionManager.cancelAsync(jobId).thenCompose(result -> {
+            if (result != null) {
+                return completedFuture(result);
+            }
+            return messaging.broadcastCancelAsync(jobId);
+        });
     }
 
     @Override
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID 
jobId, int newPriority) {
-        return messaging.broadcastChangePriorityAsync(jobId, newPriority);
+        return executionManager.changePriorityAsync(jobId, 
newPriority).thenCompose(result -> {
+            if (result != null) {
+                return completedFuture(result);
+            }
+            return messaging.broadcastChangePriorityAsync(jobId, newPriority);
+        });
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index f242c58149..b6bcd0ed94 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -453,8 +453,10 @@ public class ComputeMessaging {
     ) {
         CompletableFuture<@Nullable R> result = new CompletableFuture<>();
 
+        ClusterNode localMember = topologyService.localMember();
         CompletableFuture<?>[] futures = topologyService.allMembers()
                 .stream()
+                .filter(node -> !node.equals(localMember))
                 .map(node -> request.apply(node)
                         .thenAccept(response -> {
                             if (response != null) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index f90abc2aef..b883b0620d 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -21,12 +21,11 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.compute.JobState.CANCELED;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.QUEUED;
 import static org.apache.ignite.internal.compute.ExecutionOptions.DEFAULT;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.awaitility.Awaitility.await;
@@ -39,7 +38,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -61,7 +59,6 @@ import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
@@ -112,7 +109,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -134,7 +130,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Mock
     private LogicalTopologyService logicalTopologyService;
 
-    @InjectConfiguration
+    @InjectConfiguration("mock{threadPoolSize=1, 
threadPoolStopTimeoutMillis=100}")
     private ComputeConfiguration computeConfiguration;
 
     @Mock
@@ -142,45 +138,11 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
 
     private ComputeComponent computeComponent;
 
-    private ComputeExecutor computeExecutor;
-
-    @Captor
-    private ArgumentCaptor<ExecuteRequest> executeRequestCaptor;
-
-    @Captor
-    private ArgumentCaptor<ExecuteResponse> executeResponseCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobResultRequest> jobResultRequestCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobResultResponse> jobResultResponseCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobStatusRequest> jobStatusRequestCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobStatusResponse> jobStatusResponseCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobCancelRequest> jobCancelRequestCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobCancelResponse> jobCancelResponseCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobChangePriorityRequest> 
jobChangePriorityRequestCaptor;
-
-    @Captor
-    private ArgumentCaptor<JobChangePriorityResponse> 
jobChangePriorityResponseCaptor;
-
     private final ClusterNode testNode = new ClusterNodeImpl("test", "test", 
new NetworkAddress("test-host", 1));
     private final ClusterNode remoteNode = new ClusterNodeImpl("remote", 
"remote", new NetworkAddress("remote-host", 1));
 
     private final AtomicReference<NetworkMessageHandler> 
computeMessageHandlerRef = new AtomicReference<>();
 
-    private final AtomicBoolean responseSent = new AtomicBoolean(false);
-
     @BeforeEach
     void setUp() {
         lenient().when(ignite.name()).thenReturn(INSTANCE_NAME);
@@ -196,13 +158,8 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
             return null;
         
}).when(messagingService).addMessageHandler(eq(ComputeMessageTypes.class), 
any());
 
-        assertThat(computeConfiguration.change(computeChange ->
-                        
computeChange.changeThreadPoolStopTimeoutMillis(10_000L).changeThreadPoolSize(8)),
-                willCompleteSuccessfully()
-        );
-
         InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeConfiguration, INSTANCE_NAME);
-        computeExecutor = new ComputeExecutorImpl(ignite, stateMachine, 
computeConfiguration);
+        ComputeExecutor computeExecutor = new ComputeExecutorImpl(ignite, 
stateMachine, computeConfiguration);
 
         computeComponent = new ComputeComponentImpl(
                 INSTANCE_NAME,
@@ -215,6 +172,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         );
 
         computeComponent.start();
+        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
     }
 
     @AfterEach
@@ -247,6 +205,25 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThatNoRequestsWereSent();
     }
 
+    @Test
+    void statusCancelAndChangePriorityTriesLocalNodeFirst() {
+        JobExecution<String> runningExecution = 
computeComponent.executeLocally(List.of(), LongJob.class.getName());
+        await().until(runningExecution::statusAsync, 
willBe(jobStatusWithState(EXECUTING)));
+
+        JobExecution<String> queuedExecution = 
computeComponent.executeLocally(List.of(), LongJob.class.getName());
+        await().until(queuedExecution::statusAsync, 
willBe(jobStatusWithState(QUEUED)));
+
+        UUID jobId = queuedExecution.statusAsync().join().id();
+
+        assertThat(computeComponent.statusAsync(jobId), 
willBe(jobStatusWithState(QUEUED)));
+        assertThat(computeComponent.changePriorityAsync(jobId, 1), 
willBe(true));
+        assertThat(computeComponent.cancelAsync(jobId), willBe(true));
+
+        await().until(queuedExecution::statusAsync, 
willBe(jobStatusWithState(CANCELED)));
+
+        assertThatNoRequestsWereSent();
+    }
+
     private void assertThatNoRequestsWereSent() {
         verify(messagingService, never()).invoke(any(ClusterNode.class), 
any(), anyLong());
     }
@@ -255,7 +232,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void executesLocallyWithException() {
         ExecutionException ex = assertThrows(
                 ExecutionException.class,
-                () -> executeLocally(List.of(), 
FailingJob.class.getName()).get()
+                () -> executeLocally(FailingJob.class.getName()).get()
         );
 
         assertThat(ex.getCause(), is(instanceOf(JobException.class)));
@@ -392,42 +369,32 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void assertThatExecuteRequestWasSent(String jobClassName, 
Object... args) {
-        verify(messagingService).invoke(eq(remoteNode), 
executeRequestCaptor.capture(), anyLong());
-
-        ExecuteRequest capturedRequest = executeRequestCaptor.getValue();
+        ExecuteRequest capturedRequest = 
invokeAndCaptureRequest(ExecuteRequest.class);
 
         assertThat(capturedRequest.jobClassName(), is(jobClassName));
         assertThat(capturedRequest.args(), is(equalTo(args)));
     }
 
     private void assertThatJobResultRequestWasSent(UUID jobId) {
-        verify(messagingService).invoke(eq(remoteNode), 
jobResultRequestCaptor.capture(), anyLong());
-
-        JobResultRequest capturedRequest = jobResultRequestCaptor.getValue();
+        JobResultRequest capturedRequest = 
invokeAndCaptureRequest(JobResultRequest.class);
 
         assertThat(capturedRequest.jobId(), is(jobId));
     }
 
     private void assertThatJobStatusRequestWasSent(UUID jobId) {
-        verify(messagingService).invoke(eq(remoteNode), 
jobStatusRequestCaptor.capture(), anyLong());
-
-        JobStatusRequest capturedRequest = jobStatusRequestCaptor.getValue();
+        JobStatusRequest capturedRequest = 
invokeAndCaptureRequest(JobStatusRequest.class);
 
         assertThat(capturedRequest.jobId(), is(jobId));
     }
 
     private void assertThatJobCancelRequestWasSent(UUID jobId) {
-        verify(messagingService).invoke(eq(remoteNode), 
jobCancelRequestCaptor.capture(), anyLong());
-
-        JobCancelRequest capturedRequest = jobCancelRequestCaptor.getValue();
+        JobCancelRequest capturedRequest = 
invokeAndCaptureRequest(JobCancelRequest.class);
 
         assertThat(capturedRequest.jobId(), is(jobId));
     }
 
     private void assertThatJobChangePriorityRequestWasSent(UUID jobId) {
-        verify(messagingService).invoke(eq(remoteNode), 
jobChangePriorityRequestCaptor.capture(), anyLong());
-
-        JobChangePriorityRequest capturedRequest = 
jobChangePriorityRequestCaptor.getValue();
+        JobChangePriorityRequest capturedRequest = 
invokeAndCaptureRequest(JobChangePriorityRequest.class);
 
         assertThat(capturedRequest.jobId(), is(jobId));
     }
@@ -444,7 +411,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
 
         ExecutionException ex = assertThrows(
                 ExecutionException.class,
-                () -> executeRemotely(remoteNode, List.of(), 
FailingJob.class.getName()).get()
+                () -> executeRemotely(FailingJob.class.getName()).get()
         );
 
         assertThatExecuteRequestWasSent(FailingJob.class.getName());
@@ -455,72 +422,40 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void executesJobAndRespondsWhenGetsExecuteRequest() throws Exception {
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
+    void executesJobAndRespondsWhenGetsExecuteRequest() {
         ExecuteRequest executeRequest = new 
ComputeMessagesFactory().executeRequest()
                 .executeOptions(DEFAULT)
                 .deploymentUnits(List.of())
                 .jobClassName(SimpleJob.class.getName())
                 .args(new Object[]{"a", 42})
                 .build();
-        computeMessageHandlerRef.get().onReceived(executeRequest, testNode, 
123L);
+        ExecuteResponse executeResponse = 
sendRequestAndCaptureResponse(executeRequest, testNode, 123L);
 
-        UUID jobId = assertThatExecuteResponseIsSentTo(testNode);
-        responseSent.set(false);
-        markResponseSentOnResponseSend();
+        UUID jobId = executeResponse.jobId();
+        assertThat(jobId, is(notNullValue()));
+        assertThat(executeResponse.throwable(), is(nullValue()));
 
         JobResultRequest jobResultRequest = new 
ComputeMessagesFactory().jobResultRequest()
                 .jobId(jobId)
                 .build();
-        computeMessageHandlerRef.get().onReceived(jobResultRequest, testNode, 
456L);
+        JobResultResponse jobResultResponse = 
sendRequestAndCaptureResponse(jobResultRequest, testNode, 456L);
 
-        assertThatJobResultResponseIsSentTo(testNode);
-    }
-
-    private void markResponseSentOnResponseSend() {
-        when(messagingService.respond(any(ClusterNode.class), any(), 
anyLong()))
-                .thenAnswer(invocation -> {
-                    responseSent.set(true);
-                    return nullCompletedFuture();
-                });
-    }
-
-    private UUID assertThatExecuteResponseIsSentTo(ClusterNode sender) throws 
InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
executeResponseCaptor.capture(), eq(123L));
-        ExecuteResponse response = executeResponseCaptor.getValue();
-
-        UUID jobId = response.jobId();
-        assertThat(jobId, is(notNullValue()));
-        assertThat(response.throwable(), is(nullValue()));
-        return jobId;
-    }
-
-    private void assertThatJobResultResponseIsSentTo(ClusterNode sender) 
throws InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
jobResultResponseCaptor.capture(), eq(456L));
-        JobResultResponse response = jobResultResponseCaptor.getValue();
-
-        assertThat(response.result(), is("jobResponse"));
-        assertThat(response.throwable(), is(nullValue()));
+        assertThat(jobResultResponse.result(), is("jobResponse"));
+        assertThat(jobResultResponse.throwable(), is(nullValue()));
     }
 
     @Test
     void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws 
Exception {
         computeComponent.stop();
 
-        CompletableFuture<String> result = executeLocally(List.of(), 
SimpleJob.class.getName());
+        CompletableFuture<String> result = 
executeLocally(SimpleJob.class.getName());
 
         assertThat(result, 
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
     }
 
     @Test
     void localExecutionReleasesStopLock() throws Exception {
-        executeLocally(List.of(), SimpleJob.class.getName()).get();
+        executeLocally(SimpleJob.class.getName()).get();
 
         assertTimeoutPreemptively(Duration.ofSeconds(3), () -> 
computeComponent.stop());
     }
@@ -529,7 +464,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws 
Exception {
         computeComponent.stop();
 
-        CompletableFuture<String> result = executeRemotely(remoteNode, 
List.of(), SimpleJob.class.getName());
+        CompletableFuture<String> result = 
executeRemotely(SimpleJob.class.getName());
 
         assertThat(result, 
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
     }
@@ -540,7 +475,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
         respondWithJobResultResponseWhenJobResultRequestIsSent(jobId);
 
-        executeRemotely(remoteNode, List.of(), 
SimpleJob.class.getName()).get();
+        executeRemotely(SimpleJob.class.getName()).get();
 
         assertTimeoutPreemptively(Duration.ofSeconds(3), () -> 
computeComponent.stop());
     }
@@ -549,25 +484,13 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnExecuteRequestAttempt() throws 
Exception {
         computeComponent.stop();
 
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
         ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
                 .executeOptions(DEFAULT)
                 .deploymentUnits(List.of())
                 .jobClassName(SimpleJob.class.getName())
                 .args(new Object[]{"a", 42})
                 .build();
-        computeMessageHandlerRef.get().onReceived(request, testNode, 123L);
-
-        assertThatExecuteRequestSendsNodeStoppingExceptionTo(testNode);
-    }
-
-    private void 
assertThatExecuteRequestSendsNodeStoppingExceptionTo(ClusterNode sender) throws 
InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
executeResponseCaptor.capture(), eq(123L));
-        ExecuteResponse response = executeResponseCaptor.getValue();
+        ExecuteResponse response = sendRequestAndCaptureResponse(request, 
testNode, 123L);
 
         assertThat(response.jobId(), is(nullValue()));
         assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
@@ -578,22 +501,10 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnJobResultRequestAttempt() throws 
Exception {
         computeComponent.stop();
 
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
         JobResultRequest jobResultRequest = new 
ComputeMessagesFactory().jobResultRequest()
                 .jobId(UUID.randomUUID())
                 .build();
-        computeMessageHandlerRef.get().onReceived(jobResultRequest, testNode, 
123L);
-
-        assertThatJobResultRequestSendsNodeStoppingExceptionTo(testNode);
-    }
-
-    private void 
assertThatJobResultRequestSendsNodeStoppingExceptionTo(ClusterNode sender) 
throws InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
jobResultResponseCaptor.capture(), eq(123L));
-        JobResultResponse response = jobResultResponseCaptor.getValue();
+        JobResultResponse response = 
sendRequestAndCaptureResponse(jobResultRequest, testNode, 123L);
 
         assertThat(response.result(), is(nullValue()));
         assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
@@ -604,22 +515,10 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnJobStatusRequestAttempt() throws 
Exception {
         computeComponent.stop();
 
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
         JobStatusRequest jobStatusRequest = new 
ComputeMessagesFactory().jobStatusRequest()
                 .jobId(UUID.randomUUID())
                 .build();
-        computeMessageHandlerRef.get().onReceived(jobStatusRequest, testNode, 
123L);
-
-        assertThatJobStatusRequestSendsNodeStoppingExceptionTo(testNode);
-    }
-
-    private void 
assertThatJobStatusRequestSendsNodeStoppingExceptionTo(ClusterNode sender) 
throws InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
jobStatusResponseCaptor.capture(), eq(123L));
-        JobStatusResponse response = jobStatusResponseCaptor.getValue();
+        JobStatusResponse response = 
sendRequestAndCaptureResponse(jobStatusRequest, testNode, 123L);
 
         assertThat(response.status(), is(nullValue()));
         assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
@@ -630,23 +529,12 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnJobCancelRequestAttempt() throws 
Exception {
         computeComponent.stop();
 
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
         JobCancelRequest jobCancelRequest = new 
ComputeMessagesFactory().jobCancelRequest()
                 .jobId(UUID.randomUUID())
                 .build();
-        computeMessageHandlerRef.get().onReceived(jobCancelRequest, testNode, 
456L);
-
-        assertThatJobCancelRequestSendsNodeStoppingExceptionTo(testNode);
-    }
-
-    private void 
assertThatJobCancelRequestSendsNodeStoppingExceptionTo(ClusterNode sender) 
throws InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
jobCancelResponseCaptor.capture(), eq(456L));
-        JobCancelResponse response = jobCancelResponseCaptor.getValue();
+        JobCancelResponse response = 
sendRequestAndCaptureResponse(jobCancelRequest, testNode, 123L);
 
+        assertThat(response.result(), is(nullValue()));
         assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
         assertThat(response.throwable().getCause(), 
is(instanceOf(NodeStoppingException.class)));
     }
@@ -655,24 +543,13 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnJobChangePriorityRequestAttempt() 
throws Exception {
         computeComponent.stop();
 
-        markResponseSentOnResponseSend();
-        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
         JobChangePriorityRequest jobChangePriorityRequest = new 
ComputeMessagesFactory().jobChangePriorityRequest()
                 .jobId(UUID.randomUUID())
                 .priority(1)
                 .build();
-        computeMessageHandlerRef.get().onReceived(jobChangePriorityRequest, 
testNode, 456L);
-
-        
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(testNode);
-    }
-
-    private void 
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(ClusterNode 
sender) throws InterruptedException {
-        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
-
-        verify(messagingService).respond(eq(sender), 
jobChangePriorityResponseCaptor.capture(), eq(456L));
-        JobChangePriorityResponse response = 
jobChangePriorityResponseCaptor.getValue();
+        JobChangePriorityResponse response = 
sendRequestAndCaptureResponse(jobChangePriorityRequest, testNode, 123L);
 
+        assertThat(response.result(), is(nullValue()));
         assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
         assertThat(response.throwable().getCause(), 
is(instanceOf(NodeStoppingException.class)));
     }
@@ -680,43 +557,22 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Test
     void executorThreadsAreNamedAccordingly() {
         assertThat(
-                executeLocally(List.of(), GetThreadNameJob.class.getName()),
+                executeLocally(GetThreadNameJob.class.getName()),
                 
willBe(startsWith(NamedThreadFactory.threadPrefix(INSTANCE_NAME, "compute")))
         );
     }
 
-    private void restrictPoolSizeTo1() {
-        assertThat(computeConfiguration.change(computeChange -> 
computeChange.changeThreadPoolSize(1)), willCompleteSuccessfully());
-    }
-
     @Test
     void stopCausesCancellationExceptionOnLocalExecution() throws Exception {
-        restrictPoolSizeTo1();
-
-        assertThat(computeConfiguration.change(computeChange -> 
computeChange.changeThreadPoolStopTimeoutMillis(100)),
-                willCompleteSuccessfully());
-
-        computeComponent = new ComputeComponentImpl(
-                INSTANCE_NAME,
-                messagingService,
-                topologyService,
-                logicalTopologyService,
-                jobContextManager,
-                computeExecutor,
-                computeConfiguration
-        );
-        computeComponent.start();
-
         // take the only executor thread
-        executeLocally(List.of(), LongJob.class.getName());
+        executeLocally(LongJob.class.getName());
 
         // the corresponding task goes to work queue
-        CompletableFuture<String> resultFuture = executeLocally(List.of(), 
SimpleJob.class.getName());
+        CompletableFuture<String> resultFuture = 
executeLocally(SimpleJob.class.getName());
 
         computeComponent.stop();
 
         // now work queue is dropped to the floor, so the future should be 
resolved with a cancellation
-
         assertThat(resultFuture, willThrow(CancellationException.class));
     }
 
@@ -725,11 +581,11 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         respondWithExecuteResponseWhenExecuteRequestIsSent(UUID.randomUUID());
         respondWithIncompleteFutureWhenJobResultRequestIsSent();
 
-        CompletableFuture<String> resultFuture = executeRemotely(remoteNode, 
List.of(), SimpleJob.class.getName());
+        CompletableFuture<String> resultFuture = 
executeRemotely(SimpleJob.class.getName());
 
         computeComponent.stop();
 
-        assertThat(resultFuture, willThrow(CancellationException.class, 3, 
TimeUnit.SECONDS));
+        assertThat(resultFuture, willThrow(CancellationException.class));
     }
 
     private void respondWithIncompleteFutureWhenJobResultRequestIsSent() {
@@ -740,7 +596,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Test
     void executionOfJobOfNonExistentClassResultsInException() {
         assertThat(
-                executeLocally(List.of(), "no-such-class"),
+                executeLocally("no-such-class"),
                 willThrow(Exception.class, "Cannot load job class by name 
'no-such-class'")
         );
     }
@@ -748,7 +604,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Test
     void executionOfNonJobClassResultsInException() {
         assertThat(
-                executeLocally(List.of(), Object.class.getName()),
+                executeLocally(Object.class.getName()),
                 willThrow(Exception.class, "'java.lang.Object' does not 
implement ComputeJob interface")
         );
     }
@@ -783,17 +639,43 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    private <T extends NetworkMessage> T invokeAndCaptureRequest(Class<T> 
clazz) {
+        ArgumentCaptor<T> requestCaptor = ArgumentCaptor.forClass(clazz);
+        verify(messagingService).invoke(eq(remoteNode), 
requestCaptor.capture(), anyLong());
+        return requestCaptor.getValue();
+    }
+
+    private <T extends NetworkMessage> T 
sendRequestAndCaptureResponse(NetworkMessage request, ClusterNode sender, long 
correlationId) {
+        AtomicBoolean responseSent = new AtomicBoolean(false);
+
+        when(messagingService.respond(eq(sender), any(), eq(correlationId)))
+                .thenAnswer(invocation -> {
+                    responseSent.set(true);
+                    return nullCompletedFuture();
+                });
+
+        computeMessageHandlerRef.get().onReceived(request, testNode, 
correlationId);
+
+        await().until(responseSent::get, is(true));
+
+        ArgumentCaptor<T> responseCaptor = ArgumentCaptor.captor();
+        verify(messagingService).respond(eq(sender), responseCaptor.capture(), 
eq(correlationId));
+        return responseCaptor.getValue();
+    }
+
+    private CompletableFuture<String> executeLocally(String jobClassName, 
Object... args) {
+        return executeLocally(List.of(), jobClassName, args);
+    }
+
     private CompletableFuture<String> executeLocally(List<DeploymentUnit> 
units, String jobClassName, Object... args) {
         return computeComponent.<String>executeLocally(units, jobClassName, 
args).resultAsync();
     }
 
     private CompletableFuture<String> executeRemotely(
-            ClusterNode remoteNode,
-            List<DeploymentUnit> units,
             String jobClassName,
             Object... args
     ) {
-        return computeComponent.<String>executeRemotely(remoteNode, units, 
jobClassName, args).resultAsync();
+        return computeComponent.<String>executeRemotely(remoteNode, List.of(), 
jobClassName, args).resultAsync();
     }
 
     private static class SimpleJob implements ComputeJob<String> {

Reply via email to