This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7a479e147c IGNITE-21480 Check local node before broadcasting (#3568)
7a479e147c is described below
commit 7a479e147cc7a0efc8e7238d1eeaa4834c6376f3
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Apr 9 15:51:20 2024 +0300
IGNITE-21480 Check local node before broadcasting (#3568)
---
.../internal/compute/ComputeComponentImpl.java | 22 +-
.../compute/messaging/ComputeMessaging.java | 2 +
.../internal/compute/ComputeComponentImplTest.java | 288 ++++++---------------
3 files changed, 106 insertions(+), 206 deletions(-)
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 39f9079253..ac0f61eab6 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -196,17 +197,32 @@ public class ComputeComponentImpl implements
ComputeComponent {
@Override
public CompletableFuture<@Nullable JobStatus> statusAsync(UUID jobId) {
- return messaging.broadcastStatusAsync(jobId);
+ return executionManager.statusAsync(jobId).thenCompose(jobStatus -> {
+ if (jobStatus != null) {
+ return completedFuture(jobStatus);
+ }
+ return messaging.broadcastStatusAsync(jobId);
+ });
}
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
- return messaging.broadcastCancelAsync(jobId);
+ return executionManager.cancelAsync(jobId).thenCompose(result -> {
+ if (result != null) {
+ return completedFuture(result);
+ }
+ return messaging.broadcastCancelAsync(jobId);
+ });
}
@Override
public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID
jobId, int newPriority) {
- return messaging.broadcastChangePriorityAsync(jobId, newPriority);
+ return executionManager.changePriorityAsync(jobId,
newPriority).thenCompose(result -> {
+ if (result != null) {
+ return completedFuture(result);
+ }
+ return messaging.broadcastChangePriorityAsync(jobId, newPriority);
+ });
}
/** {@inheritDoc} */
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index f242c58149..b6bcd0ed94 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -453,8 +453,10 @@ public class ComputeMessaging {
) {
CompletableFuture<@Nullable R> result = new CompletableFuture<>();
+ ClusterNode localMember = topologyService.localMember();
CompletableFuture<?>[] futures = topologyService.allMembers()
.stream()
+ .filter(node -> !node.equals(localMember))
.map(node -> request.apply(node)
.thenAccept(response -> {
if (response != null) {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index f90abc2aef..b883b0620d 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -21,12 +21,11 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.QUEUED;
import static org.apache.ignite.internal.compute.ExecutionOptions.DEFAULT;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.awaitility.Awaitility.await;
@@ -39,7 +38,6 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
@@ -61,7 +59,6 @@ import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
@@ -112,7 +109,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -134,7 +130,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Mock
private LogicalTopologyService logicalTopologyService;
- @InjectConfiguration
+ @InjectConfiguration("mock{threadPoolSize=1,
threadPoolStopTimeoutMillis=100}")
private ComputeConfiguration computeConfiguration;
@Mock
@@ -142,45 +138,11 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
private ComputeComponent computeComponent;
- private ComputeExecutor computeExecutor;
-
- @Captor
- private ArgumentCaptor<ExecuteRequest> executeRequestCaptor;
-
- @Captor
- private ArgumentCaptor<ExecuteResponse> executeResponseCaptor;
-
- @Captor
- private ArgumentCaptor<JobResultRequest> jobResultRequestCaptor;
-
- @Captor
- private ArgumentCaptor<JobResultResponse> jobResultResponseCaptor;
-
- @Captor
- private ArgumentCaptor<JobStatusRequest> jobStatusRequestCaptor;
-
- @Captor
- private ArgumentCaptor<JobStatusResponse> jobStatusResponseCaptor;
-
- @Captor
- private ArgumentCaptor<JobCancelRequest> jobCancelRequestCaptor;
-
- @Captor
- private ArgumentCaptor<JobCancelResponse> jobCancelResponseCaptor;
-
- @Captor
- private ArgumentCaptor<JobChangePriorityRequest>
jobChangePriorityRequestCaptor;
-
- @Captor
- private ArgumentCaptor<JobChangePriorityResponse>
jobChangePriorityResponseCaptor;
-
private final ClusterNode testNode = new ClusterNodeImpl("test", "test",
new NetworkAddress("test-host", 1));
private final ClusterNode remoteNode = new ClusterNodeImpl("remote",
"remote", new NetworkAddress("remote-host", 1));
private final AtomicReference<NetworkMessageHandler>
computeMessageHandlerRef = new AtomicReference<>();
- private final AtomicBoolean responseSent = new AtomicBoolean(false);
-
@BeforeEach
void setUp() {
lenient().when(ignite.name()).thenReturn(INSTANCE_NAME);
@@ -196,13 +158,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
return null;
}).when(messagingService).addMessageHandler(eq(ComputeMessageTypes.class),
any());
- assertThat(computeConfiguration.change(computeChange ->
-
computeChange.changeThreadPoolStopTimeoutMillis(10_000L).changeThreadPoolSize(8)),
- willCompleteSuccessfully()
- );
-
InMemoryComputeStateMachine stateMachine = new
InMemoryComputeStateMachine(computeConfiguration, INSTANCE_NAME);
- computeExecutor = new ComputeExecutorImpl(ignite, stateMachine,
computeConfiguration);
+ ComputeExecutor computeExecutor = new ComputeExecutorImpl(ignite,
stateMachine, computeConfiguration);
computeComponent = new ComputeComponentImpl(
INSTANCE_NAME,
@@ -215,6 +172,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
);
computeComponent.start();
+ assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
}
@AfterEach
@@ -247,6 +205,25 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThatNoRequestsWereSent();
}
+ @Test
+ void statusCancelAndChangePriorityTriesLocalNodeFirst() {
+ JobExecution<String> runningExecution =
computeComponent.executeLocally(List.of(), LongJob.class.getName());
+ await().until(runningExecution::statusAsync,
willBe(jobStatusWithState(EXECUTING)));
+
+ JobExecution<String> queuedExecution =
computeComponent.executeLocally(List.of(), LongJob.class.getName());
+ await().until(queuedExecution::statusAsync,
willBe(jobStatusWithState(QUEUED)));
+
+ UUID jobId = queuedExecution.statusAsync().join().id();
+
+ assertThat(computeComponent.statusAsync(jobId),
willBe(jobStatusWithState(QUEUED)));
+ assertThat(computeComponent.changePriorityAsync(jobId, 1),
willBe(true));
+ assertThat(computeComponent.cancelAsync(jobId), willBe(true));
+
+ await().until(queuedExecution::statusAsync,
willBe(jobStatusWithState(CANCELED)));
+
+ assertThatNoRequestsWereSent();
+ }
+
private void assertThatNoRequestsWereSent() {
verify(messagingService, never()).invoke(any(ClusterNode.class),
any(), anyLong());
}
@@ -255,7 +232,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void executesLocallyWithException() {
ExecutionException ex = assertThrows(
ExecutionException.class,
- () -> executeLocally(List.of(),
FailingJob.class.getName()).get()
+ () -> executeLocally(FailingJob.class.getName()).get()
);
assertThat(ex.getCause(), is(instanceOf(JobException.class)));
@@ -392,42 +369,32 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
}
private void assertThatExecuteRequestWasSent(String jobClassName,
Object... args) {
- verify(messagingService).invoke(eq(remoteNode),
executeRequestCaptor.capture(), anyLong());
-
- ExecuteRequest capturedRequest = executeRequestCaptor.getValue();
+ ExecuteRequest capturedRequest =
invokeAndCaptureRequest(ExecuteRequest.class);
assertThat(capturedRequest.jobClassName(), is(jobClassName));
assertThat(capturedRequest.args(), is(equalTo(args)));
}
private void assertThatJobResultRequestWasSent(UUID jobId) {
- verify(messagingService).invoke(eq(remoteNode),
jobResultRequestCaptor.capture(), anyLong());
-
- JobResultRequest capturedRequest = jobResultRequestCaptor.getValue();
+ JobResultRequest capturedRequest =
invokeAndCaptureRequest(JobResultRequest.class);
assertThat(capturedRequest.jobId(), is(jobId));
}
private void assertThatJobStatusRequestWasSent(UUID jobId) {
- verify(messagingService).invoke(eq(remoteNode),
jobStatusRequestCaptor.capture(), anyLong());
-
- JobStatusRequest capturedRequest = jobStatusRequestCaptor.getValue();
+ JobStatusRequest capturedRequest =
invokeAndCaptureRequest(JobStatusRequest.class);
assertThat(capturedRequest.jobId(), is(jobId));
}
private void assertThatJobCancelRequestWasSent(UUID jobId) {
- verify(messagingService).invoke(eq(remoteNode),
jobCancelRequestCaptor.capture(), anyLong());
-
- JobCancelRequest capturedRequest = jobCancelRequestCaptor.getValue();
+ JobCancelRequest capturedRequest =
invokeAndCaptureRequest(JobCancelRequest.class);
assertThat(capturedRequest.jobId(), is(jobId));
}
private void assertThatJobChangePriorityRequestWasSent(UUID jobId) {
- verify(messagingService).invoke(eq(remoteNode),
jobChangePriorityRequestCaptor.capture(), anyLong());
-
- JobChangePriorityRequest capturedRequest =
jobChangePriorityRequestCaptor.getValue();
+ JobChangePriorityRequest capturedRequest =
invokeAndCaptureRequest(JobChangePriorityRequest.class);
assertThat(capturedRequest.jobId(), is(jobId));
}
@@ -444,7 +411,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
ExecutionException ex = assertThrows(
ExecutionException.class,
- () -> executeRemotely(remoteNode, List.of(),
FailingJob.class.getName()).get()
+ () -> executeRemotely(FailingJob.class.getName()).get()
);
assertThatExecuteRequestWasSent(FailingJob.class.getName());
@@ -455,72 +422,40 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
}
@Test
- void executesJobAndRespondsWhenGetsExecuteRequest() throws Exception {
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
+ void executesJobAndRespondsWhenGetsExecuteRequest() {
ExecuteRequest executeRequest = new
ComputeMessagesFactory().executeRequest()
.executeOptions(DEFAULT)
.deploymentUnits(List.of())
.jobClassName(SimpleJob.class.getName())
.args(new Object[]{"a", 42})
.build();
- computeMessageHandlerRef.get().onReceived(executeRequest, testNode,
123L);
+ ExecuteResponse executeResponse =
sendRequestAndCaptureResponse(executeRequest, testNode, 123L);
- UUID jobId = assertThatExecuteResponseIsSentTo(testNode);
- responseSent.set(false);
- markResponseSentOnResponseSend();
+ UUID jobId = executeResponse.jobId();
+ assertThat(jobId, is(notNullValue()));
+ assertThat(executeResponse.throwable(), is(nullValue()));
JobResultRequest jobResultRequest = new
ComputeMessagesFactory().jobResultRequest()
.jobId(jobId)
.build();
- computeMessageHandlerRef.get().onReceived(jobResultRequest, testNode,
456L);
+ JobResultResponse jobResultResponse =
sendRequestAndCaptureResponse(jobResultRequest, testNode, 456L);
- assertThatJobResultResponseIsSentTo(testNode);
- }
-
- private void markResponseSentOnResponseSend() {
- when(messagingService.respond(any(ClusterNode.class), any(),
anyLong()))
- .thenAnswer(invocation -> {
- responseSent.set(true);
- return nullCompletedFuture();
- });
- }
-
- private UUID assertThatExecuteResponseIsSentTo(ClusterNode sender) throws
InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
executeResponseCaptor.capture(), eq(123L));
- ExecuteResponse response = executeResponseCaptor.getValue();
-
- UUID jobId = response.jobId();
- assertThat(jobId, is(notNullValue()));
- assertThat(response.throwable(), is(nullValue()));
- return jobId;
- }
-
- private void assertThatJobResultResponseIsSentTo(ClusterNode sender)
throws InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
jobResultResponseCaptor.capture(), eq(456L));
- JobResultResponse response = jobResultResponseCaptor.getValue();
-
- assertThat(response.result(), is("jobResponse"));
- assertThat(response.throwable(), is(nullValue()));
+ assertThat(jobResultResponse.result(), is("jobResponse"));
+ assertThat(jobResultResponse.throwable(), is(nullValue()));
}
@Test
void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws
Exception {
computeComponent.stop();
- CompletableFuture<String> result = executeLocally(List.of(),
SimpleJob.class.getName());
+ CompletableFuture<String> result =
executeLocally(SimpleJob.class.getName());
assertThat(result,
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
}
@Test
void localExecutionReleasesStopLock() throws Exception {
- executeLocally(List.of(), SimpleJob.class.getName()).get();
+ executeLocally(SimpleJob.class.getName()).get();
assertTimeoutPreemptively(Duration.ofSeconds(3), () ->
computeComponent.stop());
}
@@ -529,7 +464,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws
Exception {
computeComponent.stop();
- CompletableFuture<String> result = executeRemotely(remoteNode,
List.of(), SimpleJob.class.getName());
+ CompletableFuture<String> result =
executeRemotely(SimpleJob.class.getName());
assertThat(result,
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
}
@@ -540,7 +475,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
respondWithJobResultResponseWhenJobResultRequestIsSent(jobId);
- executeRemotely(remoteNode, List.of(),
SimpleJob.class.getName()).get();
+ executeRemotely(SimpleJob.class.getName()).get();
assertTimeoutPreemptively(Duration.ofSeconds(3), () ->
computeComponent.stop());
}
@@ -549,25 +484,13 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnExecuteRequestAttempt() throws
Exception {
computeComponent.stop();
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
ExecuteRequest request = new ComputeMessagesFactory().executeRequest()
.executeOptions(DEFAULT)
.deploymentUnits(List.of())
.jobClassName(SimpleJob.class.getName())
.args(new Object[]{"a", 42})
.build();
- computeMessageHandlerRef.get().onReceived(request, testNode, 123L);
-
- assertThatExecuteRequestSendsNodeStoppingExceptionTo(testNode);
- }
-
- private void
assertThatExecuteRequestSendsNodeStoppingExceptionTo(ClusterNode sender) throws
InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
executeResponseCaptor.capture(), eq(123L));
- ExecuteResponse response = executeResponseCaptor.getValue();
+ ExecuteResponse response = sendRequestAndCaptureResponse(request,
testNode, 123L);
assertThat(response.jobId(), is(nullValue()));
assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
@@ -578,22 +501,10 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnJobResultRequestAttempt() throws
Exception {
computeComponent.stop();
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
JobResultRequest jobResultRequest = new
ComputeMessagesFactory().jobResultRequest()
.jobId(UUID.randomUUID())
.build();
- computeMessageHandlerRef.get().onReceived(jobResultRequest, testNode,
123L);
-
- assertThatJobResultRequestSendsNodeStoppingExceptionTo(testNode);
- }
-
- private void
assertThatJobResultRequestSendsNodeStoppingExceptionTo(ClusterNode sender)
throws InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
jobResultResponseCaptor.capture(), eq(123L));
- JobResultResponse response = jobResultResponseCaptor.getValue();
+ JobResultResponse response =
sendRequestAndCaptureResponse(jobResultRequest, testNode, 123L);
assertThat(response.result(), is(nullValue()));
assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
@@ -604,22 +515,10 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnJobStatusRequestAttempt() throws
Exception {
computeComponent.stop();
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
JobStatusRequest jobStatusRequest = new
ComputeMessagesFactory().jobStatusRequest()
.jobId(UUID.randomUUID())
.build();
- computeMessageHandlerRef.get().onReceived(jobStatusRequest, testNode,
123L);
-
- assertThatJobStatusRequestSendsNodeStoppingExceptionTo(testNode);
- }
-
- private void
assertThatJobStatusRequestSendsNodeStoppingExceptionTo(ClusterNode sender)
throws InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
jobStatusResponseCaptor.capture(), eq(123L));
- JobStatusResponse response = jobStatusResponseCaptor.getValue();
+ JobStatusResponse response =
sendRequestAndCaptureResponse(jobStatusRequest, testNode, 123L);
assertThat(response.status(), is(nullValue()));
assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
@@ -630,23 +529,12 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnJobCancelRequestAttempt() throws
Exception {
computeComponent.stop();
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
JobCancelRequest jobCancelRequest = new
ComputeMessagesFactory().jobCancelRequest()
.jobId(UUID.randomUUID())
.build();
- computeMessageHandlerRef.get().onReceived(jobCancelRequest, testNode,
456L);
-
- assertThatJobCancelRequestSendsNodeStoppingExceptionTo(testNode);
- }
-
- private void
assertThatJobCancelRequestSendsNodeStoppingExceptionTo(ClusterNode sender)
throws InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
jobCancelResponseCaptor.capture(), eq(456L));
- JobCancelResponse response = jobCancelResponseCaptor.getValue();
+ JobCancelResponse response =
sendRequestAndCaptureResponse(jobCancelRequest, testNode, 123L);
+ assertThat(response.result(), is(nullValue()));
assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
assertThat(response.throwable().getCause(),
is(instanceOf(NodeStoppingException.class)));
}
@@ -655,24 +543,13 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnJobChangePriorityRequestAttempt()
throws Exception {
computeComponent.stop();
- markResponseSentOnResponseSend();
- assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
-
JobChangePriorityRequest jobChangePriorityRequest = new
ComputeMessagesFactory().jobChangePriorityRequest()
.jobId(UUID.randomUUID())
.priority(1)
.build();
- computeMessageHandlerRef.get().onReceived(jobChangePriorityRequest,
testNode, 456L);
-
-
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(testNode);
- }
-
- private void
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(ClusterNode
sender) throws InterruptedException {
- assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
-
- verify(messagingService).respond(eq(sender),
jobChangePriorityResponseCaptor.capture(), eq(456L));
- JobChangePriorityResponse response =
jobChangePriorityResponseCaptor.getValue();
+ JobChangePriorityResponse response =
sendRequestAndCaptureResponse(jobChangePriorityRequest, testNode, 123L);
+ assertThat(response.result(), is(nullValue()));
assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
assertThat(response.throwable().getCause(),
is(instanceOf(NodeStoppingException.class)));
}
@@ -680,43 +557,22 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Test
void executorThreadsAreNamedAccordingly() {
assertThat(
- executeLocally(List.of(), GetThreadNameJob.class.getName()),
+ executeLocally(GetThreadNameJob.class.getName()),
willBe(startsWith(NamedThreadFactory.threadPrefix(INSTANCE_NAME, "compute")))
);
}
- private void restrictPoolSizeTo1() {
- assertThat(computeConfiguration.change(computeChange ->
computeChange.changeThreadPoolSize(1)), willCompleteSuccessfully());
- }
-
@Test
void stopCausesCancellationExceptionOnLocalExecution() throws Exception {
- restrictPoolSizeTo1();
-
- assertThat(computeConfiguration.change(computeChange ->
computeChange.changeThreadPoolStopTimeoutMillis(100)),
- willCompleteSuccessfully());
-
- computeComponent = new ComputeComponentImpl(
- INSTANCE_NAME,
- messagingService,
- topologyService,
- logicalTopologyService,
- jobContextManager,
- computeExecutor,
- computeConfiguration
- );
- computeComponent.start();
-
// take the only executor thread
- executeLocally(List.of(), LongJob.class.getName());
+ executeLocally(LongJob.class.getName());
// the corresponding task goes to work queue
- CompletableFuture<String> resultFuture = executeLocally(List.of(),
SimpleJob.class.getName());
+ CompletableFuture<String> resultFuture =
executeLocally(SimpleJob.class.getName());
computeComponent.stop();
// now work queue is dropped to the floor, so the future should be
resolved with a cancellation
-
assertThat(resultFuture, willThrow(CancellationException.class));
}
@@ -725,11 +581,11 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
respondWithExecuteResponseWhenExecuteRequestIsSent(UUID.randomUUID());
respondWithIncompleteFutureWhenJobResultRequestIsSent();
- CompletableFuture<String> resultFuture = executeRemotely(remoteNode,
List.of(), SimpleJob.class.getName());
+ CompletableFuture<String> resultFuture =
executeRemotely(SimpleJob.class.getName());
computeComponent.stop();
- assertThat(resultFuture, willThrow(CancellationException.class, 3,
TimeUnit.SECONDS));
+ assertThat(resultFuture, willThrow(CancellationException.class));
}
private void respondWithIncompleteFutureWhenJobResultRequestIsSent() {
@@ -740,7 +596,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Test
void executionOfJobOfNonExistentClassResultsInException() {
assertThat(
- executeLocally(List.of(), "no-such-class"),
+ executeLocally("no-such-class"),
willThrow(Exception.class, "Cannot load job class by name
'no-such-class'")
);
}
@@ -748,7 +604,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Test
void executionOfNonJobClassResultsInException() {
assertThat(
- executeLocally(List.of(), Object.class.getName()),
+ executeLocally(Object.class.getName()),
willThrow(Exception.class, "'java.lang.Object' does not
implement ComputeJob interface")
);
}
@@ -783,17 +639,43 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
);
}
+ private <T extends NetworkMessage> T invokeAndCaptureRequest(Class<T>
clazz) {
+ ArgumentCaptor<T> requestCaptor = ArgumentCaptor.forClass(clazz);
+ verify(messagingService).invoke(eq(remoteNode),
requestCaptor.capture(), anyLong());
+ return requestCaptor.getValue();
+ }
+
+ private <T extends NetworkMessage> T
sendRequestAndCaptureResponse(NetworkMessage request, ClusterNode sender, long
correlationId) {
+ AtomicBoolean responseSent = new AtomicBoolean(false);
+
+ when(messagingService.respond(eq(sender), any(), eq(correlationId)))
+ .thenAnswer(invocation -> {
+ responseSent.set(true);
+ return nullCompletedFuture();
+ });
+
+ computeMessageHandlerRef.get().onReceived(request, testNode,
correlationId);
+
+ await().until(responseSent::get, is(true));
+
+ ArgumentCaptor<T> responseCaptor = ArgumentCaptor.captor();
+ verify(messagingService).respond(eq(sender), responseCaptor.capture(),
eq(correlationId));
+ return responseCaptor.getValue();
+ }
+
+ private CompletableFuture<String> executeLocally(String jobClassName,
Object... args) {
+ return executeLocally(List.of(), jobClassName, args);
+ }
+
private CompletableFuture<String> executeLocally(List<DeploymentUnit>
units, String jobClassName, Object... args) {
return computeComponent.<String>executeLocally(units, jobClassName,
args).resultAsync();
}
private CompletableFuture<String> executeRemotely(
- ClusterNode remoteNode,
- List<DeploymentUnit> units,
String jobClassName,
Object... args
) {
- return computeComponent.<String>executeRemotely(remoteNode, units,
jobClassName, args).resultAsync();
+ return computeComponent.<String>executeRemotely(remoteNode, List.of(),
jobClassName, args).resultAsync();
}
private static class SimpleJob implements ComputeJob<String> {