This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new facb0f6b63 IGNITE-21066 Create job priority change API (#3019)
facb0f6b63 is described below

commit facb0f6b637c2cc0012e4257dcb086fb0e59eba0
Author: Dmitry Baranov <[email protected]>
AuthorDate: Thu Jan 18 17:57:48 2024 +0300

    IGNITE-21066 Create job priority change API (#3019)
---
 .../org/apache/ignite/compute/JobExecution.java    |   8 +
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   6 +
 .../client/compute/ClientJobExecution.java         |   6 +
 .../apache/ignite/client/fakes/FakeCompute.java    |   5 +
 .../internal/compute/ItComputeTestEmbedded.java    |  75 ++++++++
 .../internal/compute/ComputeMessageTypes.java      |   8 +
 .../ignite/internal/compute/ComputeUtils.java      |  22 ++-
 .../internal/compute/DelegatingJobExecution.java   |   5 +
 .../internal/compute/FailSafeJobExecution.java     |   5 +
 .../compute/JobExecutionFutureWrapper.java         |   5 +
 .../internal/compute/JobExecutionWrapper.java      |   5 +
 .../compute/executor/JobExecutionInternal.java     |   9 +
 .../compute/message/JobChangePriorityRequest.java  |  43 +++++
 .../JobChangePriorityResponse.java}                |  34 ++--
 .../compute/messaging/ComputeMessaging.java        |  51 ++++++
 .../compute/messaging/RemoteJobExecution.java      |   7 +
 .../compute/queue/ComputeThreadPoolExecutor.java   |  82 +++++++++
 .../compute/queue/PriorityQueueExecutor.java       |  13 +-
 .../internal/compute/queue/QueueExecution.java     |  12 +-
 .../internal/compute/queue/QueueExecutionImpl.java |  46 +++--
 .../internal/compute/ComputeComponentImplTest.java |  78 ++++++++-
 .../internal/compute/IgniteComputeImplTest.java    |   5 +
 .../queue/ComputeThreadPoolExecutorTest.java       |  76 ++++++++
 .../compute/queue/PriorityQueueExecutorTest.java   | 193 +++++++++++++++++++++
 modules/platforms/cpp/ignite/common/error_codes.h  |   2 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   2 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   6 +
 .../internal/ClusterPerTestIntegrationTest.java    |   5 +-
 28 files changed, 767 insertions(+), 47 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java 
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
index 6b05eed81d..6977fadee3 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecution.java
@@ -57,4 +57,12 @@ public interface JobExecution<R> {
      * @return The future which will be completed when cancel request is 
processed.
      */
     CompletableFuture<Void> cancelAsync();
+
+    /**
+     * Changes job priority. After priority change job will be the last in the 
queue of jobs with the same priority.
+     *
+     * @param newPriority new priority.
+     * @return The future which will be completed when change priority request 
is processed.
+     */
+    CompletableFuture<Void> changePriorityAsync(int newPriority);
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index a9a1e36e4a..87af8d63ec 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -553,6 +553,12 @@ public class ErrorGroups {
 
         /** Compute job failed. */
         public static final int COMPUTE_JOB_FAILED_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 9);
+
+        /** Can not change job priority, compute job not found error. */
+        public static final int CHANGE_JOB_PRIORITY_NO_JOB_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 10);
+
+        /** Can not change job priority, compute job is already executing. */
+        public static final int CHANGE_JOB_PRIORITY_JOB_EXECUTING_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 11);
     }
 
     /** Catalog error group. */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index 27ca0e134c..d61822ebf0 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -50,4 +50,10 @@ class ClientJobExecution<R> implements JobExecution<R> {
         // TODO https://issues.apache.org/jira/browse/IGNITE-21148
         return nullCompletedFuture();
     }
+
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21148
+        return nullCompletedFuture();
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index c5728a460e..9f9e195f6e 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -178,6 +178,11 @@ public class FakeCompute implements IgniteCompute {
             public CompletableFuture<Void> cancelAsync() {
                 return nullCompletedFuture();
             }
+
+            @Override
+            public CompletableFuture<Void> changePriorityAsync(int 
newPriority) {
+                return nullCompletedFuture();
+            }
         };
     }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index ee81c077bd..2ce4432526 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.joining;
 import static 
org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
@@ -26,11 +27,13 @@ import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ER
 import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_ERR_GROUP;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
@@ -147,6 +150,62 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.CANCELED)));
     }
 
+    @Test
+    void changeJobPriorityLocallyComputeException() {
+        IgniteImpl entryNode = node(0);
+
+        JobExecution<String> execution = 
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(), 
LongJob.class.getName());
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+        assertThat(execution.changePriorityAsync(2), 
willThrow(ComputeException.class));
+    }
+
+    @Test
+    void changeJobPriorityRemotelyComputeException() {
+        IgniteImpl entryNode = node(0);
+
+        JobExecution<String> execution = 
entryNode.compute().executeAsync(Set.of(node(1).node()), units(), 
LongJob.class.getName());
+        await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+        assertThat(execution.changePriorityAsync(2), 
willThrow(ComputeException.class));
+    }
+
+    @Test
+    void changeJobPriorityLocally() {
+        IgniteImpl entryNode = node(0);
+
+        // Start 1 task in executor with 1 thread
+        JobExecution<String> execution1 = 
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(), 
WaitLatchJob.class.getName());
+        await().until(execution1::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
+
+        // Start one more long lasting task
+        JobExecution<String> execution2 = 
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(), 
LongJob.class.getName());
+        await().until(execution2::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+
+        // Start third task
+        JobExecution<String> execution3 = 
entryNode.compute().executeAsync(Set.of(entryNode.node()), units(), 
WaitLatchJob.class.getName());
+        await().until(execution3::statusAsync, 
willBe(jobStatusWithState(JobState.QUEUED)));
+
+        // Task 1 and 2 are not competed, in queue state
+        assertThat(execution2.resultAsync().isDone(), is(false));
+        assertThat(execution3.resultAsync().isDone(), is(false));
+
+        // Change priority of task 3, so it should be executed before task 2
+        assertThat(execution3.changePriorityAsync(2), 
willCompleteSuccessfully());
+
+        // Run 1 and 3 task
+        WaitLatchJob.latch.countDown();
+
+        // Tasks 1 and 3 completed successfully
+        assertThat(execution1.resultAsync(), willCompleteSuccessfully());
+        assertThat(execution3.resultAsync(), willCompleteSuccessfully());
+        assertThat(execution1.resultAsync().isDone(), is(true));
+        assertThat(execution3.resultAsync().isDone(), is(true));
+
+        // Task 2 is not completed
+        assertThat(execution2.resultAsync().isDone(), is(false));
+    }
+
     private static class ConcatJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
@@ -219,4 +278,20 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
             return null;
         }
     }
+
+    private static class WaitLatchJob implements ComputeJob<String> {
+
+        static final CountDownLatch latch = new CountDownLatch(1);
+
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            return null;
+        }
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 767a296a50..8a38cb32e4 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.compute;
 import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
 import org.apache.ignite.internal.compute.message.ExecuteRequest;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
 import org.apache.ignite.internal.compute.message.JobResultRequest;
 import org.apache.ignite.internal.compute.message.JobResultResponse;
 import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -63,4 +65,10 @@ public class ComputeMessageTypes {
 
     /** Type for {@link JobCancelResponse}. */
     public static final short JOB_CANCEL_RESPONSE = 8;
+
+    /** Type for {@link JobChangePriorityRequest}. */
+    public static final short JOB_CHANGE_PRIORITY_REQUEST = 9;
+
+    /** Type for {@link JobChangePriorityResponse}. */
+    public static final short JOB_CHANGE_PRIORITY_RESPONSE = 10;
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 4da7d3fe3f..990d2f9456 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -35,6 +35,7 @@ import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
 import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
 import org.apache.ignite.internal.compute.message.JobResultResponse;
 import org.apache.ignite.internal.compute.message.JobStatusResponse;
 
@@ -158,11 +159,26 @@ public class ComputeUtils {
     /**
      * Extract compute job cancel result from cancel response.
      *
-     * @param jobStatusResponse Job cancel message response.
+     * @param jobCancelResponse Job cancel message response.
      * @return Completable future with result.
      */
-    public static CompletableFuture<Void> 
cancelFromJobCancelResponse(JobCancelResponse jobStatusResponse) {
-        Throwable throwable = jobStatusResponse.throwable();
+    public static CompletableFuture<Void> 
cancelFromJobCancelResponse(JobCancelResponse jobCancelResponse) {
+        Throwable throwable = jobCancelResponse.throwable();
+        if (throwable != null) {
+            return failedFuture(throwable);
+        }
+
+        return nullCompletedFuture();
+    }
+
+    /**
+     * Extract compute job change priority result from change priority 
response.
+     *
+     * @param jobChangePriorityResponse Job change priority message response.
+     * @return Completable future with result.
+     */
+    public static CompletableFuture<Void> 
changePriorityFromJobChangePriorityResponse(JobChangePriorityResponse 
jobChangePriorityResponse) {
+        Throwable throwable = jobChangePriorityResponse.throwable();
         if (throwable != null) {
             return failedFuture(throwable);
         }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
index 6730546b1a..bc1bdeedf3 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/DelegatingJobExecution.java
@@ -48,4 +48,9 @@ class DelegatingJobExecution<R> implements JobExecution<R> {
     public CompletableFuture<Void> cancelAsync() {
         return delegate.thenAccept(JobExecutionInternal::cancel);
     }
+
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        return delegate.thenAccept(jobExecutionInternal -> 
jobExecutionInternal.changePriority(newPriority));
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index a02b59460f..b2f16adc3f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -159,6 +159,11 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
         return runningJobExecution.get().cancelAsync();
     }
 
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        return runningJobExecution.get().changePriorityAsync(newPriority);
+    }
+
     /**
      * Completes the future with the exception. This method can be called only 
once.
      *
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
index 22f2a09215..95ed3ce85f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
@@ -49,4 +49,9 @@ class JobExecutionFutureWrapper<R> implements JobExecution<R> 
{
     public CompletableFuture<Void> cancelAsync() {
         return 
convertToPublicFuture(delegate.thenCompose(JobExecution::cancelAsync));
     }
+
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        return convertToPublicFuture(delegate.thenCompose(jobExecution -> 
jobExecution.changePriorityAsync(newPriority)));
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
index 8eb8a7b1ab..46a59888c6 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
@@ -49,4 +49,9 @@ class JobExecutionWrapper<R> implements JobExecution<R> {
     public CompletableFuture<Void> cancelAsync() {
         return convertToPublicFuture(delegate.cancelAsync());
     }
+
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        return 
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index b1c1106067..b7c9b8398d 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -57,4 +57,13 @@ public class JobExecutionInternal<R> {
         isInterrupted.set(true);
         execution.cancel();
     }
+
+    /**
+     * Change priority of job execution.
+     *
+     * @param newPriority new priority.
+     */
+    public void changePriority(int newPriority) {
+        execution.changePriority(newPriority);
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
new file mode 100644
index 0000000000..6b98e8ba96
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Remote change job priority request.
+ */
+@Transferable(ComputeMessageTypes.JOB_CHANGE_PRIORITY_REQUEST)
+public interface JobChangePriorityRequest extends NetworkMessage {
+    /**
+     * Returns job id.
+     *
+     * @return Job id.
+     */
+    UUID jobId();
+
+    /**
+     * Returns job priority.
+     *
+     * @return job priority.
+     */
+    int priority();
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
similarity index 50%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
index 95872bfd97..fd822849fc 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobChangePriorityResponse.java
@@ -15,36 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.message;
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Provides information about the task executing on the {@link 
PriorityQueueExecutor}, allows cancelling the task.
- *
- * @param <R> Job result type.
+ * Remote change job priority response.
  */
-public interface QueueExecution<R> {
-    /**
-     * Returns job's execution result.
-     *
-     * @return Job's execution result future.
-     */
-    CompletableFuture<R> resultAsync();
+@Transferable(ComputeMessageTypes.JOB_CHANGE_PRIORITY_RESPONSE)
+public interface JobChangePriorityResponse extends NetworkMessage {
 
     /**
-     * Returns the current status of the job. The job status may be deleted 
and thus return {@code null} if the time for retaining job
-     * status has been exceeded.
+     * Returns a {@link Throwable} that was thrown during change job priority 
request ({@code null} if the request was successful).
      *
-     * @return The current status of the job, or {@code null} if the job 
status no longer exists due to exceeding the retention time limit.
+     * @return {@link Throwable} that was thrown during change job priority 
request ({@code null} if the request was successful)
      */
     @Nullable
-    JobStatus status();
-
-    /**
-     * Cancels the job.
-     */
-    void cancel();
+    @Marshallable
+    Throwable throwable();
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index 840eac5761..cd94e83b8e 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.compute.messaging;
 
 import static 
org.apache.ignite.internal.compute.ComputeUtils.cancelFromJobCancelResponse;
+import static 
org.apache.ignite.internal.compute.ComputeUtils.changePriorityFromJobChangePriorityResponse;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.jobIdFromExecuteResponse;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResultResponse;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse;
@@ -45,6 +46,8 @@ import 
org.apache.ignite.internal.compute.message.ExecuteRequest;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
 import org.apache.ignite.internal.compute.message.JobCancelRequest;
 import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
 import org.apache.ignite.internal.compute.message.JobResultRequest;
 import org.apache.ignite.internal.compute.message.JobResultResponse;
 import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -119,6 +122,8 @@ public class ComputeMessaging {
             sendJobStatusResponse(null, ex, senderConsistentId, correlationId);
         } else if (message instanceof JobCancelRequest) {
             sendJobCancelResponse(ex, senderConsistentId, correlationId);
+        } else if (message instanceof JobChangePriorityRequest) {
+            sendJobChangePriorityResponse(ex, senderConsistentId, 
correlationId);
         }
     }
 
@@ -137,6 +142,8 @@ public class ComputeMessaging {
             processJobStatusRequest(jobStatus, (JobStatusRequest) message, 
senderConsistentId, correlationId);
         } else if (message instanceof JobCancelRequest) {
             processJobCancelRequest((JobCancelRequest) message, 
senderConsistentId, correlationId);
+        } else if (message instanceof JobChangePriorityRequest) {
+            processJobChangePriorityRequest((JobChangePriorityRequest) 
message, senderConsistentId, correlationId);
         }
     }
 
@@ -321,4 +328,48 @@ public class ComputeMessaging {
 
         messagingService.respond(senderConsistentId, jobCancelResponse, 
correlationId);
     }
+
+    /**
+     * Changes compute job priority on the remote node.
+     *
+     * @param remoteNode The priority of the job will be changed on this node.
+     * @param jobId Compute job id.
+     * @param newPriority new job priority.
+     *
+     * @return Job change priority future (will be completed when change 
priority request is processed).
+     */
+    CompletableFuture<Void> remoteChangePriorityAsync(ClusterNode remoteNode, 
UUID jobId, int newPriority) {
+        JobChangePriorityRequest jobChangePriorityRequest = 
messagesFactory.jobChangePriorityRequest()
+                .jobId(jobId)
+                .priority(newPriority)
+                .build();
+
+        return messagingService.invoke(remoteNode, jobChangePriorityRequest, 
NETWORK_TIMEOUT_MILLIS)
+                .thenCompose(networkMessage -> 
changePriorityFromJobChangePriorityResponse((JobChangePriorityResponse) 
networkMessage));
+    }
+
+    private void processJobChangePriorityRequest(JobChangePriorityRequest 
request, String senderConsistentId, long correlationId) {
+        UUID jobId = request.jobId();
+        JobExecution<Object> execution = executions.get(jobId);
+        if (execution != null) {
+            execution.changePriorityAsync(request.priority())
+                    .whenComplete((result, err) -> 
sendJobChangePriorityResponse(err, senderConsistentId, correlationId));
+        } else {
+            ComputeException ex = new 
ComputeException(Compute.CHANGE_JOB_PRIORITY_NO_JOB_ERR, "Can not change job 
priority,"
+                    + " job not found for the job id " + jobId);
+            sendJobChangePriorityResponse(ex, senderConsistentId, 
correlationId);
+        }
+    }
+
+    private void sendJobChangePriorityResponse(
+            @Nullable Throwable throwable,
+            String senderConsistentId,
+            Long correlationId
+    ) {
+        JobChangePriorityResponse jobChangePriorityResponse = 
messagesFactory.jobChangePriorityResponse()
+                .throwable(throwable)
+                .build();
+
+        messagingService.respond(senderConsistentId, 
jobChangePriorityResponse, correlationId);
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
index d7dd3a7dbd..1ece202966 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/RemoteJobExecution.java
@@ -81,4 +81,11 @@ public class RemoteJobExecution<R> implements 
JobExecution<R> {
                 jobIdFuture.thenCompose(jobId -> 
messaging.remoteCancelAsync(remoteNode, jobId))
         );
     }
+
+    @Override
+    public CompletableFuture<Void> changePriorityAsync(int newPriority) {
+        return inFlightFutures.registerFuture(
+                jobIdFuture.thenCompose(jobId -> 
messaging.remoteChangePriorityAsync(remoteNode, jobId, newPriority))
+        );
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
new file mode 100644
index 0000000000..f82307f1e0
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Wrapper for {@link ThreadPoolExecutor}. Allows removing task from work 
queue.
+ */
+public class ComputeThreadPoolExecutor {
+
+    private final BlockingQueue<Runnable> workQueue;
+
+    private final ThreadPoolExecutor executor;
+
+    ComputeThreadPoolExecutor(int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            ThreadFactory threadFactory) {
+        this.workQueue = workQueue;
+        executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, threadFactory);
+    }
+
+    /**
+     * Executes the given task sometime in the future.
+     * {@link ThreadPoolExecutor#execute(Runnable command)}
+     */
+    public void execute(Runnable command) {
+        executor.execute(command);
+    }
+
+    /**
+     * Removes this task from the executor's internal queue if it is present.
+     * {@link ThreadPoolExecutor#remove(Runnable command)}
+     */
+    public boolean remove(Runnable task) {
+        return executor.remove(task);
+    }
+
+    /**
+     * Removes this task from the executor's internal queue if it is
+     * present, thus causing it not to be run if it has not already
+     * started.
+     *
+     * @param task the task to remove
+     * @return {@code true} if the task was removed
+     */
+    public boolean removeFromQueue(Runnable task) {
+        return workQueue.remove(task);
+    }
+
+    /**
+     * Shuts down the given executor service gradually, first disabling new 
submissions and later, if necessary, cancelling remaining
+     * tasks.
+     *
+     * {@link IgniteUtils#shutdownAndAwaitTermination}
+     */
+    public void shutdown(long stopTimeout) {
+        IgniteUtils.shutdownAndAwaitTermination(executor, stopTimeout, 
TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 363040aac5..7823cf0524 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.compute.queue;
 
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.state.ComputeStateMachine;
-import org.apache.ignite.internal.util.IgniteUtils;
 
 /**
  * Compute job executor with priority mechanism.
@@ -35,7 +34,7 @@ public class PriorityQueueExecutor {
 
     private final ComputeConfiguration configuration;
 
-    private final ThreadPoolExecutor executor;
+    private final ComputeThreadPoolExecutor executor;
 
     private final ComputeStateMachine stateMachine;
 
@@ -52,12 +51,13 @@ public class PriorityQueueExecutor {
     ) {
         this.configuration = configuration;
         this.stateMachine = stateMachine;
-        executor = new ThreadPoolExecutor(
+        BlockingQueue<Runnable> workQueue = new 
BoundedPriorityBlockingQueue<>(() -> configuration.queueMaxSize().value());
+        executor = new ComputeThreadPoolExecutor(
                 configuration.threadPoolSize().value(),
                 configuration.threadPoolSize().value(),
                 THREAD_KEEP_ALIVE_SECONDS,
                 TimeUnit.SECONDS,
-                new BoundedPriorityBlockingQueue<>(() -> 
configuration.queueMaxSize().value()),
+                workQueue,
                 threadFactory
         );
     }
@@ -96,7 +96,6 @@ public class PriorityQueueExecutor {
      * Shutdown executor. After shutdown executor is not usable anymore.
      */
     public void shutdown() {
-        Long stopTimeout = configuration.threadPoolStopTimeoutMillis().value();
-        IgniteUtils.shutdownAndAwaitTermination(executor, stopTimeout, 
TimeUnit.MILLISECONDS);
+        executor.shutdown(configuration.threadPoolStopTimeoutMillis().value());
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
index 95872bfd97..f603ec3530 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecution.java
@@ -22,7 +22,8 @@ import org.apache.ignite.compute.JobStatus;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Provides information about the task executing on the {@link 
PriorityQueueExecutor}, allows cancelling the task.
+ * Provides information about the task executing on the {@link 
PriorityQueueExecutor}, allows cancelling the task,
+ * changing the job priority.
  *
  * @param <R> Job result type.
  */
@@ -47,4 +48,13 @@ public interface QueueExecution<R> {
      * Cancels the job.
      */
     void cancel();
+
+    /**
+     * Change job priority. Priority can be changed only if task still in 
executor's queue.
+     * After priority change task will be removed from the execution queue and 
run once again.
+     * Queue entry will be executed last in the queue of entries with the same 
priority (FIFO).
+     *
+     * @param newPriority new priority.
+     */
+    void changePriority(int newPriority);
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index fdbb0ca84d..bac4da281b 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -22,7 +22,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.JobStatus;
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.compute.state.IllegalJobStateTransition;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -42,14 +43,16 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
 
     private final UUID jobId;
     private final Callable<R> job;
-    private final int priority;
-    private final ThreadPoolExecutor executor;
+    private final AtomicInteger priority;
+    private final ComputeThreadPoolExecutor executor;
     private final ComputeStateMachine stateMachine;
 
     private final CompletableFuture<R> result = new CompletableFuture<>();
 
     private final AtomicReference<QueueEntry<R>> queueEntry = new 
AtomicReference<>();
 
+    private final AtomicInteger retries = new AtomicInteger();
+
     /**
      * Constructor.
      *
@@ -63,12 +66,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
             UUID jobId,
             Callable<R> job,
             int priority,
-            ThreadPoolExecutor executor,
-            ComputeStateMachine stateMachine
-    ) {
+            ComputeThreadPoolExecutor executor,
+            ComputeStateMachine stateMachine) {
         this.jobId = jobId;
         this.job = job;
-        this.priority = priority;
+        this.priority = new AtomicInteger(priority);
         this.executor = executor;
         this.stateMachine = stateMachine;
     }
@@ -99,18 +101,40 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
         }
     }
 
+    @Override
+    public void changePriority(int newPriority) {
+        if (newPriority == priority.get()) {
+            return;
+        }
+        QueueEntry<R> queueEntry = this.queueEntry.get();
+        if (executor.removeFromQueue(queueEntry)) {
+            this.priority.set(newPriority);
+            this.queueEntry.set(null);
+            run();
+        } else {
+            throw new 
ComputeException(Compute.CHANGE_JOB_PRIORITY_JOB_EXECUTING_ERR, "Can not change 
job priority,"
+                    + " job already processing. [job id = " + jobId + "]");
+        }
+    }
+
     /**
      * Runs the job, completing the result future and retrying the execution 
in case of failure at most {@code numRetries} times.
      *
      * @param numRetries Number of times to retry failed execution.
      */
     void run(int numRetries) {
+        retries.set(numRetries);
+        run();
+    }
+
+    private void run() {
         QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
             stateMachine.executeJob(jobId);
             return job.call();
-        }, priority);
+        }, priority.get());
 
-        // Ignoring previous value since it can't be running because we are 
calling run either after the construction or after the failure.
+        // Ignoring previous value since it can't be running because we are 
calling run
+        // either after the construction or after the failure.
         this.queueEntry.set(queueEntry);
 
         try {
@@ -122,9 +146,9 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
 
         queueEntry.toFuture().whenComplete((r, throwable) -> {
             if (throwable != null) {
-                if (numRetries > 0) {
+                if (retries.decrementAndGet() >= 0) {
                     stateMachine.queueJob(jobId);
-                    run(numRetries - 1);
+                    run();
                 } else {
                     if (queueEntry.isInterrupted()) {
                         stateMachine.cancelJob(jobId);
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 869f88cb2c..6a4ca07d31 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
@@ -82,6 +83,8 @@ import 
org.apache.ignite.internal.compute.message.ExecuteRequest;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
 import org.apache.ignite.internal.compute.message.JobCancelRequest;
 import org.apache.ignite.internal.compute.message.JobCancelResponse;
+import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
+import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
 import org.apache.ignite.internal.compute.message.JobResultRequest;
 import org.apache.ignite.internal.compute.message.JobResultResponse;
 import org.apache.ignite.internal.compute.message.JobStatusRequest;
@@ -161,6 +164,12 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Captor
     private ArgumentCaptor<JobCancelResponse> jobCancelResponseCaptor;
 
+    @Captor
+    private ArgumentCaptor<JobChangePriorityRequest> 
jobChangePriorityRequestCaptor;
+
+    @Captor
+    private ArgumentCaptor<JobChangePriorityResponse> 
jobChangePriorityResponseCaptor;
+
     private final ClusterNode remoteNode = new ClusterNodeImpl("remote", 
"remote", new NetworkAddress("remote-host", 1));
 
     private final AtomicReference<NetworkMessageHandler> 
computeMessageHandlerRef = new AtomicReference<>();
@@ -205,6 +214,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThat(execution.resultAsync(), willBe("jobResponse"));
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
         assertThat(execution.cancelAsync(), 
willThrow(CancellingException.class));
+        assertThat(execution.changePriorityAsync(1), 
willThrow(ComputeException.class));
 
         assertThatNoRequestsWereSent();
     }
@@ -244,7 +254,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
         respondWithJobResultResponseWhenJobResultRequestIsSent(jobId);
         respondWithJobStatusResponseWhenJobStatusRequestIsSent(jobId, 
COMPLETED);
-        respondWithCancellingExceptionhenJobCancelRequestIsSent(jobId);
+        respondWithCancellingExceptionWhenJobCancelRequestIsSent(jobId);
 
         JobExecution<String> execution = 
computeComponent.executeRemotely(remoteNode, List.of(), 
SimpleJob.class.getName(), "a", 42);
         assertThat(execution.resultAsync(), willBe("remoteResponse"));
@@ -279,6 +289,19 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThatJobCancelRequestWasSent(jobId);
     }
 
+    @Test
+    void changePriorityRemotelyUsingNetworkCommunication() {
+        UUID jobId = UUID.randomUUID();
+        respondWithExecuteResponseWhenExecuteRequestIsSent(jobId);
+        
respondWithJobChangePriorityResponseWhenJobChangePriorityRequestIsSent(jobId);
+
+        JobExecution<String> execution = 
computeComponent.executeRemotely(remoteNode, List.of(), 
LongJob.class.getName());
+
+        assertThat(execution.changePriorityAsync(1), 
willCompleteSuccessfully());
+
+        assertThatJobChangePriorityRequestWasSent(jobId);
+    }
+
     private void respondWithExecuteResponseWhenExecuteRequestIsSent(UUID 
jobId) {
         ExecuteResponse executeResponse = new 
ComputeMessagesFactory().executeResponse()
                 .jobId(jobId)
@@ -310,7 +333,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 .thenReturn(completedFuture(jobCancelResponse));
     }
 
-    private void respondWithCancellingExceptionhenJobCancelRequestIsSent(UUID 
jobId) {
+    private void respondWithCancellingExceptionWhenJobCancelRequestIsSent(UUID 
jobId) {
         JobCancelResponse jobCancelResponse = new 
ComputeMessagesFactory().jobCancelResponse()
                 .throwable(new CancellingException(jobId))
                 .build();
@@ -318,6 +341,13 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 .thenReturn(completedFuture(jobCancelResponse));
     }
 
+    private void 
respondWithJobChangePriorityResponseWhenJobChangePriorityRequestIsSent(UUID 
jobId) {
+        JobChangePriorityResponse jobChangePriorityResponse = new 
ComputeMessagesFactory().jobChangePriorityResponse()
+                .build();
+        when(messagingService.invoke(any(ClusterNode.class), argThat(msg -> 
jobChangePriorityRequestWithJobId(msg, jobId)), anyLong()))
+                .thenReturn(completedFuture(jobChangePriorityResponse));
+    }
+
     private static boolean jobResultRequestWithJobId(NetworkMessage argument, 
UUID jobId) {
         if (argument instanceof JobResultRequest) {
             JobResultRequest jobResultRequest = (JobResultRequest) argument;
@@ -342,6 +372,14 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         return false;
     }
 
+    private static boolean jobChangePriorityRequestWithJobId(NetworkMessage 
argument, UUID jobId) {
+        if (argument instanceof JobChangePriorityRequest) {
+            JobChangePriorityRequest jobChangePriorityRequest = 
(JobChangePriorityRequest) argument;
+            return jobChangePriorityRequest.jobId() == jobId;
+        }
+        return false;
+    }
+
     private void assertThatExecuteRequestWasSent(String jobClassName, 
Object... args) {
         verify(messagingService).invoke(eq(remoteNode), 
executeRequestCaptor.capture(), anyLong());
 
@@ -375,6 +413,14 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThat(capturedRequest.jobId(), is(jobId));
     }
 
+    private void assertThatJobChangePriorityRequestWasSent(UUID jobId) {
+        verify(messagingService).invoke(eq(remoteNode), 
jobChangePriorityRequestCaptor.capture(), anyLong());
+
+        JobChangePriorityRequest capturedRequest = 
jobChangePriorityRequestCaptor.getValue();
+
+        assertThat(capturedRequest.jobId(), is(jobId));
+    }
+
     @Test
     void executesRemotelyWithException() {
         UUID jobId = UUID.randomUUID();
@@ -604,6 +650,34 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThat(response.throwable().getCause(), 
is(instanceOf(NodeStoppingException.class)));
     }
 
+    @Test
+    void stoppedComponentReturnsExceptionOnJobChangePriorityRequestAttempt() 
throws Exception {
+        computeComponent.stop();
+
+        markResponseSentOnResponseSend();
+        assertThat(computeMessageHandlerRef.get(), is(notNullValue()));
+
+        String sender = "test";
+
+        JobChangePriorityRequest jobChangePriorityRequest = new 
ComputeMessagesFactory().jobChangePriorityRequest()
+                .jobId(UUID.randomUUID())
+                .priority(1)
+                .build();
+        computeMessageHandlerRef.get().onReceived(jobChangePriorityRequest, 
sender, 456L);
+
+        assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(sender);
+    }
+
+    private void 
assertThatJobChangePriorityRequestSendsNodeStoppingExceptionTo(String sender) 
throws InterruptedException {
+        assertTrue(waitForCondition(responseSent::get, 1000), "No response 
sent");
+
+        verify(messagingService).respond(eq(sender), 
jobChangePriorityResponseCaptor.capture(), eq(456L));
+        JobChangePriorityResponse response = 
jobChangePriorityResponseCaptor.getValue();
+
+        assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
+        assertThat(response.throwable().getCause(), 
is(instanceOf(NodeStoppingException.class)));
+    }
+
     @Test
     void executorThreadsAreNamedAccordingly() {
         assertThat(
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 3b8ed364fd..32c194940c 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -171,6 +171,11 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest 
{
             public CompletableFuture<Void> cancelAsync() {
                 return nullCompletedFuture();
             }
+
+            @Override
+            public CompletableFuture<Void> changePriorityAsync(int 
newPriority) {
+                return nullCompletedFuture();
+            }
         };
     }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
new file mode 100644
index 0000000000..814fcba269
--- /dev/null
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/ComputeThreadPoolExecutorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ComputeThreadPoolExecutorTest {
+
+    private ComputeThreadPoolExecutor computeThreadPoolExecutor;
+    private BlockingQueue<Runnable> workQueue;
+
+    @BeforeEach
+    public void setup() {
+        workQueue = new BoundedPriorityBlockingQueue<>(() -> 5);
+        computeThreadPoolExecutor = new ComputeThreadPoolExecutor(1, 1, 10, 
TimeUnit.SECONDS, workQueue,
+                new NamedThreadFactory(NamedThreadFactory.threadPrefix("test", 
"compute"),
+                        
Loggers.forClass(ComputeThreadPoolExecutorTest.class)));
+    }
+
+    @Test
+    public void testRemoveFromQueue2Tasks() {
+        // Add 2 tasks to the executor
+        computeThreadPoolExecutor.execute(longTask());
+        QueueEntry<Void> longTask = longTask();
+        computeThreadPoolExecutor.execute(longTask);
+
+        // Check longTask is in workQueue
+        assertEquals(1, workQueue.size());
+        assertTrue(workQueue.contains(longTask));
+
+        // Remove task from executor
+        computeThreadPoolExecutor.removeFromQueue(longTask);
+
+        // Check longTask was removed from workQueue
+        assertTrue(workQueue.isEmpty());
+    }
+
+    @Test
+    public void testRemoveFromQueue1Task() {
+        // Add 1 tasks to the executor
+        computeThreadPoolExecutor.execute(longTask());
+
+        // Check longTask is not in workQueue
+        assertTrue(workQueue.isEmpty());
+    }
+
+    private static QueueEntry<Void> longTask() {
+        return new QueueEntry<>(() -> {
+            TimeUnit.MINUTES.sleep(1);
+            return null;
+        }, 0);
+    }
+}
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 990e1bbc3a..9b9a289fa2 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
@@ -360,6 +361,198 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         assertThat(runTimes.get(), is(1));
     }
 
+    @Test
+    public void testChangePriorityBeforeExecution() {
+        initExecutor(1);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        //Start three tasks
+        CompletableFuture<Integer> task1 = submit(() -> {
+            latch1.await();
+            return 0;
+        }, 10);
+
+        CompletableFuture<Integer> task2 = submit(() -> {
+            latch2.await();
+            return 1;
+        }, 5);
+
+        QueueExecution<Integer> runningExecution3 =  
priorityQueueExecutor.submit(() -> {
+            latch3.await();
+            return 2;
+        }, 1, 0);
+
+        CompletableFuture<Integer> task3 = runningExecution3.resultAsync();
+
+        assertThat(task1.isDone(), is(false));
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+
+        //Change priority on task3, it should be executed before task2
+        runningExecution3.changePriority(20);
+
+        //Task 1 should be completed
+        latch1.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+
+        //Current executing task is 3 because we changed priority
+        latch2.countDown();
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+
+        latch3.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2, willCompleteSuccessfully());
+        assertThat(task3, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void testChangePriorityInTheMiddleExecution() {
+        initExecutor(1);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+        CountDownLatch latch4 = new CountDownLatch(1);
+
+        //Start four tasks
+        CompletableFuture<Integer> task1 = submit(() -> {
+            latch1.await();
+            return 0;
+        }, 10);
+
+        CompletableFuture<Integer> task2 = submit(() -> {
+            latch2.await();
+            return 1;
+        }, 5);
+
+        QueueExecution<Integer> runningExecution =  
priorityQueueExecutor.submit(() -> {
+            latch3.await();
+            return 2;
+        }, 1, 0);
+
+        CompletableFuture<Integer> task3 = runningExecution.resultAsync();
+
+        CompletableFuture<Integer> task4 = submit(() -> {
+            latch4.await();
+            return 4;
+        }, 5);
+
+        assertThat(task1.isDone(), is(false));
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+        assertThat(task4.isDone(), is(false));
+
+
+        //Task 1 should be completed
+        latch1.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+        assertThat(task4.isDone(), is(false));
+
+        //Change priority on task3, it should be executed before task2 and 
task4
+        runningExecution.changePriority(20);
+
+        //Current executing task is 3 because we changed priority
+        latch2.countDown();
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+        assertThat(task4.isDone(), is(false));
+
+        //Current executing task is 3 because we changed priority
+        latch4.countDown();
+        assertThat(task2.isDone(), is(false));
+        assertThat(task3.isDone(), is(false));
+        assertThat(task4.isDone(), is(false));
+
+        //Complete task3, task2 and task4 will be executed as well
+        latch3.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2, willCompleteSuccessfully());
+        assertThat(task3, willCompleteSuccessfully());
+        assertThat(task4, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void testChangePriorityAlreadyExecuting() {
+        initExecutor(1);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        //Start two tasks
+        QueueExecution<Integer> runningExecution1 =  
priorityQueueExecutor.submit(() -> {
+            latch1.await();
+            return 2;
+        }, 1, 0);
+
+        CompletableFuture<Integer> task1 = runningExecution1.resultAsync();
+
+        CompletableFuture<Integer> task2 = submit(() -> {
+            latch2.await();
+            return 1;
+        }, 5);
+
+        assertThat(task1.isDone(), is(false));
+        assertThat(task2.isDone(), is(false));
+
+        //Change priority on task1, it is already in executing stage, 
exception should be thrown
+        assertThrows(ComputeException.class, () -> 
runningExecution1.changePriority(20));
+
+        //Task 1 should not be completed because change priority failed and 
task2 has higher priority
+        latch1.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2.isDone(), is(false));
+
+        //Complete task2 and task1 will be executed after task2
+        latch2.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void testChangePriorityAllExecuting() {
+        initExecutor(3);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        //Start three tasks
+        QueueExecution<Integer> runningExecution =  
priorityQueueExecutor.submit(() -> {
+            latch1.await();
+            return 2;
+        }, 1, 0);
+
+        CompletableFuture<Integer> task1 = runningExecution.resultAsync();
+
+        CompletableFuture<Integer> task2 = submit(() -> {
+            latch2.await();
+            return 1;
+        }, 1);
+
+        assertThat(task1.isDone(), is(false));
+        assertThat(task2.isDone(), is(false));
+
+        //Change priority on task3, it should be executed before task2
+        assertThrows(ComputeException.class, () -> 
runningExecution.changePriority(2));
+
+        //Task 1 should not be completed because of changed priority of task3
+        latch1.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2.isDone(), is(false));
+
+        //Current executing task is 3 because we changed priority
+        latch2.countDown();
+        assertThat(task1, willCompleteSuccessfully());
+        assertThat(task2, willCompleteSuccessfully());
+    }
+
     private void initExecutor(int threads) {
         initExecutor(threads, Integer.MAX_VALUE);
     }
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 6624b991ac..f2911b85b5 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -187,6 +187,8 @@ enum class code : underlying_t {
     RESULT_NOT_FOUND = 0x100007,
     FAIL_TO_GET_JOB_STATUS = 0x100008,
     COMPUTE_JOB_FAILED = 0x100009,
+    CHANGE_JOB_PRIORITY_NO_JOB = 0x10000a,
+    CHANGE_JOB_PRIORITY_JOB_EXECUTING = 0x10000b,
 
     // Catalog group. Group code: 17
     VALIDATION = 0x110001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index c3a2a0c6e3..f6eba7ee3d 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -276,6 +276,8 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::RESULT_NOT_FOUND:
         case error::code::FAIL_TO_GET_JOB_STATUS:
         case error::code::COMPUTE_JOB_FAILED:
+        case error::code::CHANGE_JOB_PRIORITY_NO_JOB:
+        case error::code::CHANGE_JOB_PRIORITY_JOB_EXECUTING:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Catalog group. Group code: 17
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 41408caf50..a8642e2b81 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -506,6 +506,12 @@ namespace Apache.Ignite
 
             /// <summary> ComputeJobFailed error. </summary>
             public const int ComputeJobFailed = (GroupCode << 16) | (9 & 
0xFFFF);
+
+            /// <summary> ChangeJobPriorityNoJob error. </summary>
+            public const int ChangeJobPriorityNoJob = (GroupCode << 16) | (10 
& 0xFFFF);
+
+            /// <summary> ChangeJobPriorityJobExecuting error. </summary>
+            public const int ChangeJobPriorityJobExecuting = (GroupCode << 16) 
| (11 & 0xFFFF);
         }
 
         /// <summary> Catalog errors. </summary>
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 97da1b0a6a..cc1d6fc133 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -47,7 +47,10 @@ public abstract class ClusterPerTestIntegrationTest extends 
IgniteIntegrationTes
             + "  },\n"
             + "  clientConnector: { port:{} },\n"
             + "  rest.port: {}\n"
-            + "}";
+            + "  compute: {\n"
+            + "    threadPoolSize: 1\n"
+            + "  }\n"
+            + "}\n";
 
     /** Template for node bootstrap config with Scalecube settings for fast 
failure detection. */
     protected static final String 
FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"

Reply via email to