This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new facb0f6b63 IGNITE-21066 Create job priority change API (#3019)
facb0f6b63 is described below
commit facb0f6b637c2cc0012e4257dcb086fb0e59eba0
Author: Dmitry Baranov <[email protected]>
AuthorDate: Thu Jan 18 17:57:48 2024 +0300
IGNITE-21066 Create job priority change API (#3019)
---
.../org/apache/ignite/compute/JobExecution.java | 8 +
.../java/org/apache/ignite/lang/ErrorGroups.java | 6 +
.../client/compute/ClientJobExecution.java | 6 +
.../apache/ignite/client/fakes/FakeCompute.java | 5 +
.../internal/compute/ItComputeTestEmbedded.java | 75 ++++++++
.../internal/compute/ComputeMessageTypes.java | 8 +
.../ignite/internal/compute/ComputeUtils.java | 22 ++-
.../internal/compute/DelegatingJobExecution.java | 5 +
.../internal/compute/FailSafeJobExecution.java | 5 +
.../compute/JobExecutionFutureWrapper.java | 5 +
.../internal/compute/JobExecutionWrapper.java | 5 +
.../compute/executor/JobExecutionInternal.java | 9 +
.../compute/message/JobChangePriorityRequest.java | 43 +++++
.../JobChangePriorityResponse.java} | 34 ++--
.../compute/messaging/ComputeMessaging.java | 51 ++++++
.../compute/messaging/RemoteJobExecution.java | 7 +
.../compute/queue/ComputeThreadPoolExecutor.java | 82 +++++++++
.../compute/queue/PriorityQueueExecutor.java | 13 +-
.../internal/compute/queue/QueueExecution.java | 12 +-
.../internal/compute/queue/QueueExecutionImpl.java | 46 +++--
.../internal/compute/ComputeComponentImplTest.java | 78 ++++++++-
.../internal/compute/IgniteComputeImplTest.java | 5 +
.../queue/ComputeThreadPoolExecutorTest.java | 76 ++++++++
.../compute/queue/PriorityQueueExecutorTest.java | 193 +++++++++++++++++++++
modules/platforms/cpp/ignite/common/error_codes.h | 2 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 2 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 6 +
.../internal/ClusterPerTestIntegrationTest.java | 5 +-
28 files changed, 767 insertions(+), 47 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
index 6b05eed81d..6977fadee3 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
@@ -57,4 +57,12 @@ public interface JobExecution<R> {
* @return The future which will be completed when cancel request is
processed.
*/
CompletableFuture<Void> cancelAsync();
+
+ /**
+ * Changes job priority. After priority change job will be the last in the
queue of jobs with the same priority.
+ *
+ * @param newPriority new priority.
+ * @return The future which will be completed when change priority request
is processed.
+ */
+ CompletableFuture<Void> changePriorityAsync(int newPriority);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index a9a1e36e4a..87af8d63ec 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -553,6 +553,12 @@ public class ErrorGroups {
/** Compute job failed. */
public static final int COMPUTE_JOB_FAILED_ERR =
COMPUTE_ERR_GROUP.registerErrorCode((short) 9);
+
+ /** Can not change job priority, compute job not found error. */
+ public static final int CHANGE_JOB_PRIORITY_NO_JOB_ERR =
COMPUTE_ERR_GROUP.registerErrorCode((short) 10);
+
+ /** Can not change job priority, compute job is already executing. */
+ public static final int CHANGE_JOB_PRIORITY_JOB_EXECUTING_ERR =
COMPUTE_ERR_GROUP.registerErrorCode((short) 11);
}
/** Catalog error group. */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index 27ca0e134c..d61822ebf0 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -50,4 +50,10 @@ class ClientJobExecution<R> implements JobExecution<R> {
// TODO https://issues.apache.org/jira/browse/IGNITE-21148
return nullCompletedFuture();
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-21148
+ return nullCompletedFuture();
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index c5728a460e..9f9e195f6e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -178,6 +178,11 @@ public class FakeCompute implements IgniteCompute {
public CompletableFuture<Void> cancelAsync() {
return nullCompletedFuture();
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int
newPriority) {
+ return nullCompletedFuture();
+ }
};
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index ee81c077bd..2ce4432526 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.joining;
import static
org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
@@ -26,11 +27,13 @@ import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ER
import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_ERR_GROUP;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
@@ -147,6 +150,62 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.CANCELED)));
}
+ @Test
+ void changeJobPriorityLocallyComputeException() {
+ IgniteImpl entryNode = node(0);
+
+ JobExecution<String> execution =
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(),
LongJob.class.getName());
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+ assertThat(execution.changePriorityAsync(2),
willThrow(ComputeException.class));
+ }
+
+ @Test
+ void changeJobPriorityRemotelyComputeException() {
+ IgniteImpl entryNode = node(0);
+
+ JobExecution<String> execution =
entryNode.compute().executeAsync(Set.of(node(1).node()), units(),
LongJob.class.getName());
+ await().until(execution::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+ assertThat(execution.changePriorityAsync(2),
willThrow(ComputeException.class));
+ }
+
+ @Test
+ void changeJobPriorityLocally() {
+ IgniteImpl entryNode = node(0);
+
+ // Start 1 task in executor with 1 thread
+ JobExecution<String> execution1 =
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(),
WaitLatchJob.class.getName());
+ await().until(execution1::statusAsync,
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+ // Start one more long lasting task
+ JobExecution<String> execution2 =
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(),
LongJob.class.getName());
+ await().until(execution2::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+
+ // Start third task
+ JobExecution<String> execution3 =
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(),
WaitLatchJob.class.getName());
+ await().until(execution3::statusAsync,
willBe(jobStatusWithState(JobState.QUEUED)));
+
+ // Task 1 and 2 are not competed, in queue state
+ assertThat(execution2.resultAsync().isDone(), is(false));
+ assertThat(execution3.resultAsync().isDone(), is(false));
+
+ // Change priority of task 3, so it should be executed before task 2
+ assertThat(execution3.changePriorityAsync(2),
willCompleteSuccessfully());
+
+ // Run 1 and 3 task
+ WaitLatchJob.latch.countDown();
+
+ // Tasks 1 and 3 completed successfully
+ assertThat(execution1.resultAsync(), willCompleteSuccessfully());
+ assertThat(execution3.resultAsync(), willCompleteSuccessfully());
+ assertThat(execution1.resultAsync().isDone(), is(true));
+ assertThat(execution3.resultAsync().isDone(), is(true));
+
+ // Task 2 is not completed
+ assertThat(execution2.resultAsync().isDone(), is(false));
+ }
+
private static class ConcatJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
@@ -219,4 +278,20 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
return null;
}
}
+
+ private static class WaitLatchJob implements ComputeJob<String> {
+
+ static final CountDownLatch latch = new CountDownLatch(1);
+
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return null;
+ }
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 767a296a50..8a38cb32e4 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.compute;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -63,4 +65,10 @@ public class ComputeMessageTypes {
/** Type for {@link JobCancelResponse}. */
public static final short JOB_CANCEL_RESPONSE = 8;
+
+ /** Type for {@link JobChangePriorityRequest}. */
+ public static final short JOB_CHANGE_PRIORITY_REQUEST = 9;
+
+ /** Type for {@link JobChangePriorityResponse}. */
+ public static final short JOB_CHANGE_PRIORITY_RESPONSE = 10;
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 4da7d3fe3f..990d2f9456 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -35,6 +35,7 @@ import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusResponse;
@@ -158,11 +159,26 @@ public class ComputeUtils {
/**
* Extract compute job cancel result from cancel response.
*
- * @param jobStatusResponse Job cancel message response.
+ * @param jobCancelResponse Job cancel message response.
* @return Completable future with result.
*/
- public static CompletableFuture<Void>
cancelFromJobCancelResponse(JobCancelResponse jobStatusResponse) {
- Throwable throwable = jobStatusResponse.throwable();
+ public static CompletableFuture<Void>
cancelFromJobCancelResponse(JobCancelResponse jobCancelResponse) {
+ Throwable throwable = jobCancelResponse.throwable();
+ if (throwable != null) {
+ return failedFuture(throwable);
+ }
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Extract compute job change priority result from change priority
response.
+ *
+ * @param jobChangePriorityResponse Job change priority message response.
+ * @return Completable future with result.
+ */
+ public static CompletableFuture<Void>
changePriorityFromJobChangePriorityResponse(JobChangePriorityResponse
jobChangePriorityResponse) {
+ Throwable throwable = jobChangePriorityResponse.throwable();
if (throwable != null) {
return failedFuture(throwable);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
index 6730546b1a..bc1bdeedf3 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
@@ -48,4 +48,9 @@ class DelegatingJobExecution<R> implements JobExecution<R> {
public CompletableFuture<Void> cancelAsync() {
return delegate.thenAccept(JobExecutionInternal::cancel);
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ return delegate.thenAccept(jobExecutionInternal ->
jobExecutionInternal.changePriority(newPriority));
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index a02b59460f..b2f16adc3f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -159,6 +159,11 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
return runningJobExecution.get().cancelAsync();
}
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ return runningJobExecution.get().changePriorityAsync(newPriority);
+ }
+
/**
* Completes the future with the exception. This method can be called only
once.
*
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
index 22f2a09215..95ed3ce85f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
@@ -49,4 +49,9 @@ class JobExecutionFutureWrapper<R> implements JobExecution<R>
{
public CompletableFuture<Void> cancelAsync() {
return
convertToPublicFuture(delegate.thenCompose(JobExecution::cancelAsync));
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ return convertToPublicFuture(delegate.thenCompose(jobExecution ->
jobExecution.changePriorityAsync(newPriority)));
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
index 8eb8a7b1ab..46a59888c6 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
@@ -49,4 +49,9 @@ class JobExecutionWrapper<R> implements JobExecution<R> {
public CompletableFuture<Void> cancelAsync() {
return convertToPublicFuture(delegate.cancelAsync());
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ return
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index b1c1106067..b7c9b8398d 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -57,4 +57,13 @@ public class JobExecutionInternal<R> {
isInterrupted.set(true);
execution.cancel();
}
+
+ /**
+ * Change priority of job execution.
+ *
+ * @param newPriority new priority.
+ */
+ public void changePriority(int newPriority) {
+ execution.changePriority(newPriority);
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
new file mode 100644
index 0000000000..6b98e8ba96
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Remote change job priority request.
+ */
+@Transferable(ComputeMessageTypes.JOB_CHANGE_PRIORITY_REQUEST)
+public interface JobChangePriorityRequest extends NetworkMessage {
+ /**
+ * Returns job id.
+ *
+ * @return Job id.
+ */
+ UUID jobId();
+
+ /**
+ * Returns job priority.
+ *
+ * @return job priority.
+ */
+ int priority();
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
similarity index 50%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
index 95872bfd97..fd822849fc 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
@@ -15,36 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.message;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
/**
- * Provides information about the task executing on the {@link
PriorityQueueExecutor}, allows cancelling the task.
- *
- * @param <R> Job result type.
+ * Remote change job priority response.
*/
-public interface QueueExecution<R> {
- /**
- * Returns job's execution result.
- *
- * @return Job's execution result future.
- */
- CompletableFuture<R> resultAsync();
+@Transferable(ComputeMessageTypes.JOB_CHANGE_PRIORITY_RESPONSE)
+public interface JobChangePriorityResponse extends NetworkMessage {
/**
- * Returns the current status of the job. The job status may be deleted
and thus return {@code null} if the time for retaining job
- * status has been exceeded.
+ * Returns a {@link Throwable} that was thrown during change job priority
request ({@code null} if the request was successful).
*
- * @return The current status of the job, or {@code null} if the job
status no longer exists due to exceeding the retention time limit.
+ * @return {@link Throwable} that was thrown during change job priority
request ({@code null} if the request was successful)
*/
@Nullable
- JobStatus status();
-
- /**
- * Cancels the job.
- */
- void cancel();
+ @Marshallable
+ Throwable throwable();
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index 840eac5761..cd94e83b8e 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.compute.messaging;
import static
org.apache.ignite.internal.compute.ComputeUtils.cancelFromJobCancelResponse;
+import static
org.apache.ignite.internal.compute.ComputeUtils.changePriorityFromJobChangePriorityResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.jobIdFromExecuteResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResultResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse;
@@ -45,6 +46,8 @@ import
org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.compute.message.JobCancelRequest;
import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -119,6 +122,8 @@ public class ComputeMessaging {
sendJobStatusResponse(null, ex, senderConsistentId, correlationId);
} else if (message instanceof JobCancelRequest) {
sendJobCancelResponse(ex, senderConsistentId, correlationId);
+ } else if (message instanceof JobChangePriorityRequest) {
+ sendJobChangePriorityResponse(ex, senderConsistentId,
correlationId);
}
}
@@ -137,6 +142,8 @@ public class ComputeMessaging {
processJobStatusRequest(jobStatus, (JobStatusRequest) message,
senderConsistentId, correlationId);
} else if (message instanceof JobCancelRequest) {
processJobCancelRequest((JobCancelRequest) message,
senderConsistentId, correlationId);
+ } else if (message instanceof JobChangePriorityRequest) {
+ processJobChangePriorityRequest((JobChangePriorityRequest)
message, senderConsistentId, correlationId);
}
}
@@ -321,4 +328,48 @@ public class ComputeMessaging {
messagingService.respond(senderConsistentId, jobCancelResponse,
correlationId);
}
+
+ /**
+ * Changes compute job priority on the remote node.
+ *
+ * @param remoteNode The priority of the job will be changed on this node.
+ * @param jobId Compute job id.
+ * @param newPriority new job priority.
+ *
+ * @return Job change priority future (will be completed when change
priority request is processed).
+ */
+ CompletableFuture<Void> remoteChangePriorityAsync(ClusterNode remoteNode,
UUID jobId, int newPriority) {
+ JobChangePriorityRequest jobChangePriorityRequest =
messagesFactory.jobChangePriorityRequest()
+ .jobId(jobId)
+ .priority(newPriority)
+ .build();
+
+ return messagingService.invoke(remoteNode, jobChangePriorityRequest,
NETWORK_TIMEOUT_MILLIS)
+ .thenCompose(networkMessage ->
changePriorityFromJobChangePriorityResponse((JobChangePriorityResponse)
networkMessage));
+ }
+
+ private void processJobChangePriorityRequest(JobChangePriorityRequest
request, String senderConsistentId, long correlationId) {
+ UUID jobId = request.jobId();
+ JobExecution<Object> execution = executions.get(jobId);
+ if (execution != null) {
+ execution.changePriorityAsync(request.priority())
+ .whenComplete((result, err) ->
sendJobChangePriorityResponse(err, senderConsistentId, correlationId));
+ } else {
+ ComputeException ex = new
ComputeException(Compute.CHANGE_JOB_PRIORITY_NO_JOB_ERR, "Can not change job
priority,"
+ + " job not found for the job id " + jobId);
+ sendJobChangePriorityResponse(ex, senderConsistentId,
correlationId);
+ }
+ }
+
+ private void sendJobChangePriorityResponse(
+ @Nullable Throwable throwable,
+ String senderConsistentId,
+ Long correlationId
+ ) {
+ JobChangePriorityResponse jobChangePriorityResponse =
messagesFactory.jobChangePriorityResponse()
+ .throwable(throwable)
+ .build();
+
+ messagingService.respond(senderConsistentId,
jobChangePriorityResponse, correlationId);
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
index d7dd3a7dbd..1ece202966 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
@@ -81,4 +81,11 @@ public class RemoteJobExecution<R> implements
JobExecution<R> {
jobIdFuture.thenCompose(jobId ->
messaging.remoteCancelAsync(remoteNode, jobId))
);
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+ return inFlightFutures.registerFuture(
+ jobIdFuture.thenCompose(jobId ->
messaging.remoteChangePriorityAsync(remoteNode, jobId, newPriority))
+ );
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
new file mode 100644
index 0000000000..f82307f1e0
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Wrapper for {@link ThreadPoolExecutor}. Allows removing task from work
queue.
+ */
+public class ComputeThreadPoolExecutor {
+
+ private final BlockingQueue<Runnable> workQueue;
+
+ private final ThreadPoolExecutor executor;
+
+ ComputeThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ this.workQueue = workQueue;
+ executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ /**
+ * Executes the given task sometime in the future.
+ * {@link ThreadPoolExecutor#execute(Runnable command)}
+ */
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is present.
+ * {@link ThreadPoolExecutor#remove(Runnable command)}
+ */
+ public boolean remove(Runnable task) {
+ return executor.remove(task);
+ }
+
+ /**
+ * Removes this task from the executor's internal queue if it is
+ * present, thus causing it not to be run if it has not already
+ * started.
+ *
+ * @param task the task to remove
+ * @return {@code true} if the task was removed
+ */
+ public boolean removeFromQueue(Runnable task) {
+ return workQueue.remove(task);
+ }
+
+ /**
+ * Shuts down the given executor service gradually, first disabling new
submissions and later, if necessary, cancelling remaining
+ * tasks.
+ *
+ * {@link IgniteUtils#shutdownAndAwaitTermination}
+ */
+ public void shutdown(long stopTimeout) {
+ IgniteUtils.shutdownAndAwaitTermination(executor, stopTimeout,
TimeUnit.MILLISECONDS);
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 363040aac5..7823cf0524 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.compute.queue;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
-import org.apache.ignite.internal.util.IgniteUtils;
/**
* Compute job executor with priority mechanism.
@@ -35,7 +34,7 @@ public class PriorityQueueExecutor {
private final ComputeConfiguration configuration;
- private final ThreadPoolExecutor executor;
+ private final ComputeThreadPoolExecutor executor;
private final ComputeStateMachine stateMachine;
@@ -52,12 +51,13 @@ public class PriorityQueueExecutor {
) {
this.configuration = configuration;
this.stateMachine = stateMachine;
- executor = new ThreadPoolExecutor(
+ BlockingQueue<Runnable> workQueue = new
BoundedPriorityBlockingQueue<>(() -> configuration.queueMaxSize().value());
+ executor = new ComputeThreadPoolExecutor(
configuration.threadPoolSize().value(),
configuration.threadPoolSize().value(),
THREAD_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
- new BoundedPriorityBlockingQueue<>(() ->
configuration.queueMaxSize().value()),
+ workQueue,
threadFactory
);
}
@@ -96,7 +96,6 @@ public class PriorityQueueExecutor {
* Shutdown executor. After shutdown executor is not usable anymore.
*/
public void shutdown() {
- Long stopTimeout = configuration.threadPoolStopTimeoutMillis().value();
- IgniteUtils.shutdownAndAwaitTermination(executor, stopTimeout,
TimeUnit.MILLISECONDS);
+ executor.shutdown(configuration.threadPoolStopTimeoutMillis().value());
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
index 95872bfd97..f603ec3530 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
@@ -22,7 +22,8 @@ import org.apache.ignite.compute.JobStatus;
import org.jetbrains.annotations.Nullable;
/**
- * Provides information about the task executing on the {@link
PriorityQueueExecutor}, allows cancelling the task.
+ * Provides information about the task executing on the {@link
PriorityQueueExecutor}, allows cancelling the task,
+ * changing the job priority.
*
* @param <R> Job result type.
*/
@@ -47,4 +48,13 @@ public interface QueueExecution<R> {
* Cancels the job.
*/
void cancel();
+
+ /**
+ * Change job priority. Priority can be changed only if task still in
executor's queue.
+ * After priority change task will be removed from the execution queue and
run once again.
+ * Queue entry will be executed last in the queue of entries with the same
priority (FIFO).
+ *
+ * @param newPriority new priority.
+ */
+ void changePriority(int newPriority);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index fdbb0ca84d..bac4da281b 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -22,7 +22,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobStatus;
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.compute.state.IllegalJobStateTransition;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.lang.ErrorGroups.Compute;
import org.jetbrains.annotations.Nullable;
/**
@@ -42,14 +43,16 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
private final UUID jobId;
private final Callable<R> job;
- private final int priority;
- private final ThreadPoolExecutor executor;
+ private final AtomicInteger priority;
+ private final ComputeThreadPoolExecutor executor;
private final ComputeStateMachine stateMachine;
private final CompletableFuture<R> result = new CompletableFuture<>();
private final AtomicReference<QueueEntry<R>> queueEntry = new
AtomicReference<>();
+ private final AtomicInteger retries = new AtomicInteger();
+
/**
* Constructor.
*
@@ -63,12 +66,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
UUID jobId,
Callable<R> job,
int priority,
- ThreadPoolExecutor executor,
- ComputeStateMachine stateMachine
- ) {
+ ComputeThreadPoolExecutor executor,
+ ComputeStateMachine stateMachine) {
this.jobId = jobId;
this.job = job;
- this.priority = priority;
+ this.priority = new AtomicInteger(priority);
this.executor = executor;
this.stateMachine = stateMachine;
}
@@ -99,18 +101,40 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
}
}
+ @Override
+ public void changePriority(int newPriority) {
+ if (newPriority == priority.get()) {
+ return;
+ }
+ QueueEntry<R> queueEntry = this.queueEntry.get();
+ if (executor.removeFromQueue(queueEntry)) {
+ this.priority.set(newPriority);
+ this.queueEntry.set(null);
+ run();
+ } else {
+ throw new
ComputeException(Compute.CHANGE_JOB_PRIORITY_JOB_EXECUTING_ERR, "Can not change
job priority,"
+ + " job already processing. [job id = " + jobId + "]");
+ }
+ }
+
/**
* Runs the job, completing the result future and retrying the execution
in case of failure at most {@code numRetries} times.
*
* @param numRetries Number of times to retry failed execution.
*/
void run(int numRetries) {
+ retries.set(numRetries);
+ run();
+ }
+
+ private void run() {
QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
stateMachine.executeJob(jobId);
return job.call();
- }, priority);
+ }, priority.get());
- // Ignoring previous value since it can't be running because we are
calling run either after the construction or after the failure.
+ // Ignoring previous value since it can't be running because we are
calling run
+ // either after the construction or after the failure.
this.queueEntry.set(queueEntry);
try {
@@ -122,9 +146,9 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
queueEntry.toFuture().whenComplete((r, throwable) -> {
if (throwable != null) {
- if (numRetries > 0) {
+ if (retries.decrementAndGet() >= 0) {
stateMachine.queueJob(jobId);
- run(numRetries - 1);
+ run();
} else {
if (queueEntry.isInterrupted()) {
stateMachine.cancelJob(jobId);
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 869f88cb2c..6a4ca07d31 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
@@ -82,6 +83,8 @@ import
org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.compute.message.JobCancelRequest;
import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -161,6 +164,12 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
@Captor
private ArgumentCaptor<JobCancelResponse> jobCancelResponseCaptor;
+ @Captor
+ private ArgumentCaptor<JobChangePriorityRequest>
jobChangePriorityRequestCaptor;
+
+ @Captor
+ private ArgumentCaptor<JobChangePriorityResponse>
jobChangePriorityResponseCaptor;
+
private final ClusterNode remoteNode = new ClusterNodeImpl("remote",
"remote", new NetworkAddress("remote-host", 1));
private final AtomicReference<NetworkMessageHandler>
computeMessageHandlerRef = new AtomicReference<>();
@@ -205,6 +214,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(execution.resultAsync(), willBe("jobResponse"));
assertThat(execution.statusAsync(),
willBe(jobStatusWithState(COMPLETED)));
assertThat(execution.cancelAsync(),
willThrow(CancellingException.class));
+ assertThat(execution.changePriorityAsync(1),
willThrow(ComputeException.class));
assertThatNoRequestsWereSent();
}
@@ -244,7 +254,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
respondWithJobResultResponseWhenJobResultRequestIsSent(jobId);
respondWithJobStatusResponseWhenJobStatusRequestIsSent(jobId,
COMPLETED);
- respondWithCancellingExceptionhenJobCancelRequestIsSent(jobId);
+ respondWithCancellingExceptionWhenJobCancelRequestIsSent(jobId);
JobExecution<String> execution =
computeComponent.executeRemotely(remoteNode, List.of(),
SimpleJob.class.getName(), "a", 42);
assertThat(execution.resultAsync(), willBe("remoteResponse"));
@@ -279,6 +289,19 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThatJobCancelRequestWasSent(jobId);
}
+ @Test
+ void changePriorityRemotelyUsingNetworkCommunication() {
+ UUID jobId = UUID.randomUUID();
+ respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
+
respondWithJobChangePriorityResponseWhenJobChangePriorityRequestIsSent(jobId);
+
+ JobExecution<String> execution =
computeComponent.executeRemotely(remoteNode, List.of(),
LongJob.class.getName());
+
+ assertThat(execution.changePriorityAsync(1),
willCompleteSuccessfully());
+
+ assertThatJobChangePriorityRequestWasSent(jobId);
+ }
+
private void respondWithExecuteResponseWhenExecuteRequestIsSent(UUID
jobId) {
ExecuteResponse executeResponse = new
ComputeMessagesFactory().executeResponse()
.jobId(jobId)
@@ -310,7 +333,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
.thenReturn(completedFuture(jobCancelResponse));
}
- private void respondWithCancellingExceptionhenJobCancelRequestIsSent(UUID
jobId) {
+ private void respondWithCancellingExceptionWhenJobCancelRequestIsSent(UUID
jobId) {
JobCancelResponse jobCancelResponse = new
ComputeMessagesFactory().jobCancelResponse()
.throwable(new CancellingException(jobId))
.build();
@@ -318,6 +341,13 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
.thenReturn(completedFuture(jobCancelResponse));
}
+ private void
respondWithJobChangePriorityResponseWhenJobChangePriorityRequestIsSent(UUID
jobId) {
+ JobChangePriorityResponse jobChangePriorityResponse = new
ComputeMessagesFactory().jobChangePriorityResponse()
+ .build();
+ when(messagingService.invoke(any(ClusterNode.class), argThat(msg ->
jobChangePriorityRequestWithJobId(msg, jobId)), anyLong()))
+ .thenReturn(completedFuture(jobChangePriorityResponse));
+ }
+
private static boolean jobResultRequestWithJobId(NetworkMessage argument,
UUID jobId) {
if (argument instanceof JobResultRequest) {
JobResultRequest jobResultRequest = (JobResultRequest) argument;
@@ -342,6 +372,14 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
return false;
}
+ private static boolean jobChangePriorityRequestWithJobId(NetworkMessage
argument, UUID jobId) {
+ if (argument instanceof JobChangePriorityRequest) {
+ JobChangePriorityRequest jobChangePriorityRequest =
(JobChangePriorityRequest) argument;
+ return jobChangePriorityRequest.jobId() == jobId;
+ }
+ return false;
+ }
+
private void assertThatExecuteRequestWasSent(String jobClassName,
Object... args) {
verify(messagingService).invoke(eq(remoteNode),
executeRequestCaptor.capture(), anyLong());
@@ -375,6 +413,14 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(capturedRequest.jobId(), is(jobId));
}
+ private void assertThatJobChangePriorityRequestWasSent(UUID jobId) {
+ verify(messagingService).invoke(eq(remoteNode),
jobChangePriorityRequestCaptor.capture(), anyLong());
+
+ JobChangePriorityRequest capturedRequest =
jobChangePriorityRequestCaptor.getValue();
+
+ assertThat(capturedRequest.jobId(), is(jobId));
+ }
+
@Test
void executesRemotelyWithException() {
UUID jobId = UUID.randomUUID();
@@ -604,6 +650,34 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(response.throwable().getCause(),
is(instanceOf(NodeStoppingException.class)));
}
+ @Test
+ void stoppedComponentReturnsExceptionOnJobChangePriorityRequestAttempt()
throws Exception {
+ computeComponent.stop();
+
+ markResponseSentOnResponseSend();
+ assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+ String sender = "test";
+
+ JobChangePriorityRequest jobChangePriorityRequest = new
ComputeMessagesFactory().jobChangePriorityRequest()
+ .jobId(UUID.randomUUID())
+ .priority(1)
+ .build();
+ computeMessageHandlerRef.get().onReceived(jobChangePriorityRequest,
sender, 456L);
+
+ assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(sender);
+ }
+
+ private void
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(String sender)
throws InterruptedException {
+ assertTrue(waitForCondition(responseSent::get, 1000), "No response
sent");
+
+ verify(messagingService).respond(eq(sender),
jobChangePriorityResponseCaptor.capture(), eq(456L));
+ JobChangePriorityResponse response =
jobChangePriorityResponseCaptor.getValue();
+
+ assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
+ assertThat(response.throwable().getCause(),
is(instanceOf(NodeStoppingException.class)));
+ }
+
@Test
void executorThreadsAreNamedAccordingly() {
assertThat(
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 3b8ed364fd..32c194940c 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -171,6 +171,11 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest
{
public CompletableFuture<Void> cancelAsync() {
return nullCompletedFuture();
}
+
+ @Override
+ public CompletableFuture<Void> changePriorityAsync(int
newPriority) {
+ return nullCompletedFuture();
+ }
};
}
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
new file mode 100644
index 0000000000..814fcba269
--- /dev/null
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ComputeThreadPoolExecutorTest {
+
+ private ComputeThreadPoolExecutor computeThreadPoolExecutor;
+ private BlockingQueue<Runnable> workQueue;
+
+ @BeforeEach
+ public void setup() {
+ workQueue = new BoundedPriorityBlockingQueue<>(() -> 5);
+ computeThreadPoolExecutor = new ComputeThreadPoolExecutor(1, 1, 10,
TimeUnit.SECONDS, workQueue,
+ new NamedThreadFactory(NamedThreadFactory.threadPrefix("test",
"compute"),
+
Loggers.forClass(ComputeThreadPoolExecutorTest.class)));
+ }
+
+ @Test
+ public void testRemoveFromQueue2Tasks() {
+ // Add 2 tasks to the executor
+ computeThreadPoolExecutor.execute(longTask());
+ QueueEntry<Void> longTask = longTask();
+ computeThreadPoolExecutor.execute(longTask);
+
+ // Check longTask is in workQueue
+ assertEquals(1, workQueue.size());
+ assertTrue(workQueue.contains(longTask));
+
+ // Remove task from executor
+ computeThreadPoolExecutor.removeFromQueue(longTask);
+
+ // Check longTask was removed from workQueue
+ assertTrue(workQueue.isEmpty());
+ }
+
+ @Test
+ public void testRemoveFromQueue1Task() {
+ // Add 1 tasks to the executor
+ computeThreadPoolExecutor.execute(longTask());
+
+ // Check longTask is not in workQueue
+ assertTrue(workQueue.isEmpty());
+ }
+
+ private static QueueEntry<Void> longTask() {
+ return new QueueEntry<>(() -> {
+ TimeUnit.MINUTES.sleep(1);
+ return null;
+ }, 0);
+ }
+}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 990e1bbc3a..9b9a289fa2 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
@@ -360,6 +361,198 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
assertThat(runTimes.get(), is(1));
}
+ @Test
+ public void testChangePriorityBeforeExecution() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ //Start three tasks
+ CompletableFuture<Integer> task1 = submit(() -> {
+ latch1.await();
+ return 0;
+ }, 10);
+
+ CompletableFuture<Integer> task2 = submit(() -> {
+ latch2.await();
+ return 1;
+ }, 5);
+
+ QueueExecution<Integer> runningExecution3 =
priorityQueueExecutor.submit(() -> {
+ latch3.await();
+ return 2;
+ }, 1, 0);
+
+ CompletableFuture<Integer> task3 = runningExecution3.resultAsync();
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ //Change priority on task3, it should be executed before task2
+ runningExecution3.changePriority(20);
+
+ //Task 1 should be completed
+ latch1.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ //Current executing task is 3 because we changed priority
+ latch2.countDown();
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+
+ latch3.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2, willCompleteSuccessfully());
+ assertThat(task3, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testChangePriorityInTheMiddleExecution() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+ CountDownLatch latch4 = new CountDownLatch(1);
+
+ //Start four tasks
+ CompletableFuture<Integer> task1 = submit(() -> {
+ latch1.await();
+ return 0;
+ }, 10);
+
+ CompletableFuture<Integer> task2 = submit(() -> {
+ latch2.await();
+ return 1;
+ }, 5);
+
+ QueueExecution<Integer> runningExecution =
priorityQueueExecutor.submit(() -> {
+ latch3.await();
+ return 2;
+ }, 1, 0);
+
+ CompletableFuture<Integer> task3 = runningExecution.resultAsync();
+
+ CompletableFuture<Integer> task4 = submit(() -> {
+ latch4.await();
+ return 4;
+ }, 5);
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+ assertThat(task4.isDone(), is(false));
+
+
+ //Task 1 should be completed
+ latch1.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+ assertThat(task4.isDone(), is(false));
+
+ //Change priority on task3, it should be executed before task2 and
task4
+ runningExecution.changePriority(20);
+
+ //Current executing task is 3 because we changed priority
+ latch2.countDown();
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+ assertThat(task4.isDone(), is(false));
+
+ //Current executing task is 3 because we changed priority
+ latch4.countDown();
+ assertThat(task2.isDone(), is(false));
+ assertThat(task3.isDone(), is(false));
+ assertThat(task4.isDone(), is(false));
+
+ //Complete task3, task2 and task4 will be executed as well
+ latch3.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2, willCompleteSuccessfully());
+ assertThat(task3, willCompleteSuccessfully());
+ assertThat(task4, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testChangePriorityAlreadyExecuting() {
+ initExecutor(1);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ //Start two tasks
+ QueueExecution<Integer> runningExecution1 =
priorityQueueExecutor.submit(() -> {
+ latch1.await();
+ return 2;
+ }, 1, 0);
+
+ CompletableFuture<Integer> task1 = runningExecution1.resultAsync();
+
+ CompletableFuture<Integer> task2 = submit(() -> {
+ latch2.await();
+ return 1;
+ }, 5);
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+
+ //Change priority on task1, it is already in executing stage,
exception should be thrown
+ assertThrows(ComputeException.class, () ->
runningExecution1.changePriority(20));
+
+ //Task 1 should not be completed because change priority failed and
task2 has higher priority
+ latch1.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+
+ //Complete task2 and task1 will be executed after task2
+ latch2.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void testChangePriorityAllExecuting() {
+ initExecutor(3);
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ //Start three tasks
+ QueueExecution<Integer> runningExecution =
priorityQueueExecutor.submit(() -> {
+ latch1.await();
+ return 2;
+ }, 1, 0);
+
+ CompletableFuture<Integer> task1 = runningExecution.resultAsync();
+
+ CompletableFuture<Integer> task2 = submit(() -> {
+ latch2.await();
+ return 1;
+ }, 1);
+
+ assertThat(task1.isDone(), is(false));
+ assertThat(task2.isDone(), is(false));
+
+ //Change priority on task3, it should be executed before task2
+ assertThrows(ComputeException.class, () ->
runningExecution.changePriority(2));
+
+ //Task 1 should not be completed because of changed priority of task3
+ latch1.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2.isDone(), is(false));
+
+ //Current executing task is 3 because we changed priority
+ latch2.countDown();
+ assertThat(task1, willCompleteSuccessfully());
+ assertThat(task2, willCompleteSuccessfully());
+ }
+
private void initExecutor(int threads) {
initExecutor(threads, Integer.MAX_VALUE);
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 6624b991ac..f2911b85b5 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -187,6 +187,8 @@ enum class code : underlying_t {
RESULT_NOT_FOUND = 0x100007,
FAIL_TO_GET_JOB_STATUS = 0x100008,
COMPUTE_JOB_FAILED = 0x100009,
+ CHANGE_JOB_PRIORITY_NO_JOB = 0x10000a,
+ CHANGE_JOB_PRIORITY_JOB_EXECUTING = 0x10000b,
// Catalog group. Group code: 17
VALIDATION = 0x110001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index c3a2a0c6e3..f6eba7ee3d 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -276,6 +276,8 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::RESULT_NOT_FOUND:
case error::code::FAIL_TO_GET_JOB_STATUS:
case error::code::COMPUTE_JOB_FAILED:
+ case error::code::CHANGE_JOB_PRIORITY_NO_JOB:
+ case error::code::CHANGE_JOB_PRIORITY_JOB_EXECUTING:
return sql_state::SHY000_GENERAL_ERROR;
// Catalog group. Group code: 17
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 41408caf50..a8642e2b81 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -506,6 +506,12 @@ namespace Apache.Ignite
/// <summary> ComputeJobFailed error. </summary>
public const int ComputeJobFailed = (GroupCode << 16) | (9 &
0xFFFF);
+
+ /// <summary> ChangeJobPriorityNoJob error. </summary>
+ public const int ChangeJobPriorityNoJob = (GroupCode << 16) | (10
& 0xFFFF);
+
+ /// <summary> ChangeJobPriorityJobExecuting error. </summary>
+ public const int ChangeJobPriorityJobExecuting = (GroupCode << 16)
| (11 & 0xFFFF);
}
/// <summary> Catalog errors. </summary>
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 97da1b0a6a..cc1d6fc133 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -47,7 +47,10 @@ public abstract class ClusterPerTestIntegrationTest extends
IgniteIntegrationTes
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {}\n"
- + "}";
+ + " compute: {\n"
+ + " threadPoolSize: 1\n"
+ + " }\n"
+ + "}\n";
/** Template for node bootstrap config with Scalecube settings for fast
failure detection. */
protected static final String
FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"