This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f98bd2b1a IGNITE-20843 Introduce Compute Job states. (#2894)
7f98bd2b1a is described below

commit 7f98bd2b1aa86b8047aaa58cd7d1f8f16d87fb8b
Author: Mikhail <[email protected]>
AuthorDate: Wed Dec 6 14:01:10 2023 +0300

    IGNITE-20843 Introduce Compute Job states. (#2894)
---
 .../org/apache/ignite/compute/IgniteCompute.java   |  20 +-
 .../java/org/apache/ignite/compute/JobState.java}  |  40 +++-
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../internal/compute/ComputeComponentImpl.java     | 186 +++++-------------
 .../ignite/internal/compute/ComputeUtils.java      |  52 +++++
 .../ComputeExecutor.java => JobStarter.java}       |  31 ++-
 .../configuration/ComputeConfigurationSchema.java  |   8 +
 .../{queue => executor}/ComputeExecutor.java       |   2 +-
 .../{queue => executor}/ComputeExecutorImpl.java   |  31 ++-
 .../compute/messaging/ComputeMessaging.java        | 144 ++++++++++++++
 .../compute/queue/PriorityQueueExecutor.java       |  24 ++-
 .../ignite/internal/compute/queue/QueueEntry.java  |  10 +-
 .../compute/state/ComputeStateMachine.java         |  91 +++++++++
 .../compute/state/IllegalJobStateTransition.java   |  43 +++++
 .../compute/state/InMemoryComputeStateMachine.java | 167 ++++++++++++++++
 .../internal/compute/ComputeComponentImplTest.java |  50 ++---
 .../compute/queue/PriorityQueueExecutorTest.java   |   4 +-
 .../state/InMemoryComputeStateMachineTest.java     | 212 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +-
 19 files changed, 912 insertions(+), 213 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 085c6de970..f52ee20122 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -35,11 +35,11 @@ public interface IgniteCompute {
     /**
      * Executes a {@link ComputeJob} of the given class on a single node from 
a set of candidate nodes.
      *
-     * @param nodes    Candidate nodes; the job will be executed on one of 
them.
-     * @param units    Deployment units. Can be empty.
+     * @param nodes Candidate nodes; the job will be executed on one of them.
+     * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param args     Arguments of the job.
-     * @param <R>      Job result type
+     * @param args Arguments of the job.
+     * @param <R> Job result type.
      * @return CompletableFuture Job result.
      */
     <R> CompletableFuture<R> executeAsync(
@@ -52,11 +52,11 @@ public interface IgniteCompute {
     /**
      * Executes a {@link ComputeJob} of the given class on a single node from 
a set of candidate nodes.
      *
-     * @param nodes    Candidate nodes; the job will be executed on one of 
them.
-     * @param units    Deployment units. Can be empty.
+     * @param nodes Candidate nodes; the job will be executed on one of them.
+     * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param args     Arguments of the job.
-     * @param <R>      Job result type
+     * @param args Arguments of the job.
+     * @param <R> Job result type
      * @return Job result.
      */
     <R> R execute(
@@ -153,8 +153,8 @@ public interface IgniteCompute {
      * @param nodes Nodes to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param args     Arguments of the job.
-     * @param <R>      Job result type.
+     * @param args Arguments of the job.
+     * @param <R> Job result type.
      * @return Map from node to job result future.
      */
     <R> Map<ClusterNode, CompletableFuture<R>> broadcastAsync(
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
 b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
similarity index 56%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
copy to modules/api/src/main/java/org/apache/ignite/compute/JobState.java
index 7d6ad4c645..c89a3f866a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
@@ -15,19 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.queue;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.internal.compute.ExecutionOptions;
+package org.apache.ignite.compute;
 
 /**
- * Executor of Compute jobs.
+ * Compute job's state enum.
  */
-public interface ComputeExecutor {
-    <R> CompletableFuture<R> executeJob(ExecutionOptions options, 
Class<ComputeJob<R>> jobClass, Object[] args);
+public enum JobState {
+    /**
+     * The job is submitted and waiting for an execution start.
+     */
+    QUEUED,
+
+    /**
+     * The job is being executed.
+     */
+    EXECUTING,
+
+    /**
+     * The job was unexpectedly terminated during execution.
+     */
+    FAILED,
+
+    /**
+     * The job was executed successfully and the execution result was returned.
+     */
+    COMPLETED,
 
-    void start();
+    /**
+     * The job has received the cancel command, but it is still running.
+     */
+    CANCELING,
 
-    void stop();
+    /**
+     * The job was successfully cancelled.
+     */
+    CANCELED
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d8721a12f6..d3623b2131 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -516,6 +516,9 @@ public class ErrorGroups {
 
         /** Compute execution queue overflow error. */
         public static final int QUEUE_OVERFLOW_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 4);
+
+        /** Compute job state transfer error. */
+        public static final int COMPUTE_JOB_STATE_TRANSITION_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 5);
     }
 
     /** Catalog error group. */
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index e4f7c62133..af211cbd61 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -17,39 +17,29 @@
 
 package org.apache.ignite.internal.compute;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
-import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
-import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.compute.executor.ComputeExecutor;
 import org.apache.ignite.internal.compute.loader.JobContext;
 import org.apache.ignite.internal.compute.loader.JobContextManager;
-import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.internal.compute.queue.ComputeExecutor;
+import org.apache.ignite.internal.compute.messaging.ComputeMessaging;
 import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Implementation of {@link ComputeComponent}.
  */
 public class ComputeComponentImpl implements ComputeComponent {
-    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
-
-    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -58,12 +48,12 @@ public class ComputeComponentImpl implements 
ComputeComponent {
 
     private final InFlightFutures inFlightFutures = new InFlightFutures();
 
-    private final MessagingService messagingService;
-
     private final JobContextManager jobContextManager;
 
     private final ComputeExecutor executor;
 
+    private final ComputeMessaging messaging;
+
     /**
      * Creates a new instance.
      */
@@ -72,37 +62,9 @@ public class ComputeComponentImpl implements 
ComputeComponent {
             JobContextManager jobContextManager,
             ComputeExecutor executor
     ) {
-        this.messagingService = messagingService;
         this.jobContextManager = jobContextManager;
         this.executor = executor;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        executor.start();
-
-        messagingService.addMessageHandler(ComputeMessageTypes.class, 
(message, senderConsistentId, correlationId) -> {
-            assert correlationId != null;
-
-            if (message instanceof ExecuteRequest) {
-                processExecuteRequest((ExecuteRequest) message, 
senderConsistentId, correlationId);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void stop() throws Exception {
-        if (!stopGuard.compareAndSet(false, true)) {
-            return;
-        }
-
-        busyLock.block();
-
-        executor.stop();
-
-        inFlightFutures.cancelInFlightFutures();
+        messaging = new ComputeMessaging(messagingService);
     }
 
     /** {@inheritDoc} */
@@ -113,30 +75,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
             String jobClassName,
             Object... args
     ) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
-
-        try {
-            return mapClassLoaderExceptions(jobClassLoader(units), 
jobClassName)
-                    .thenCompose(context ->
-                            doExecuteLocally(options, 
ComputeUtils.<R>jobClass(context.classLoader(), jobClassName), args)
-                                    .whenComplete((r, e) -> context.close())
-                    );
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    private <R> CompletableFuture<R> doExecuteLocally(
-            ExecutionOptions options,
-            Class<ComputeJob<R>> jobClass,
-            Object[] args
-    ) {
-        CompletableFuture<R> future = executor.executeJob(options, jobClass, 
args);
-        inFlightFutures.registerFuture(future);
-
-        return future;
+        return start(options, units, jobClassName, args);
     }
 
     /** {@inheritDoc} */
@@ -149,106 +88,67 @@ public class ComputeComponentImpl implements 
ComputeComponent {
             Object... args
     ) {
         if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            return doExecuteRemotely(options, remoteNode, units, jobClassName, 
args);
+            CompletableFuture<R> future = 
messaging.remoteExecuteRequest(options, remoteNode, units, jobClassName, args);
+            inFlightFutures.registerFuture(future);
+            return future;
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private <R> CompletableFuture<R> doExecuteRemotely(
+    private <R> CompletableFuture<R> start(
             ExecutionOptions options,
-            ClusterNode remoteNode,
             List<DeploymentUnit> units,
             String jobClassName,
-            Object[] args
+            Object... args
     ) {
-        List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
-                .map(this::toDeploymentUnitMsg)
-                .collect(Collectors.toList());
-
-        ExecuteRequest executeRequest = messagesFactory.executeRequest()
-                .executeOptions(options)
-                .deploymentUnits(deploymentUnitMsgs)
-                .jobClassName(jobClassName)
-                .args(args)
-                .build();
-
-        CompletableFuture<R> future = messagingService.invoke(remoteNode, 
executeRequest, NETWORK_TIMEOUT_MILLIS)
-                .thenCompose(message -> 
resultFromExecuteResponse((ExecuteResponse) message));
-        inFlightFutures.registerFuture(future);
-        return future;
-    }
-
-    private void processExecuteRequest(ExecuteRequest executeRequest, String 
senderConsistentId, long correlationId) {
         if (!busyLock.enterBusy()) {
-            sendExecuteResponse(null, new NodeStoppingException(), 
senderConsistentId, correlationId);
-            return;
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
 
         try {
-            List<DeploymentUnit> units = 
toDeploymentUnit(executeRequest.deploymentUnits());
-
-            mapClassLoaderExceptions(jobClassLoader(units), 
executeRequest.jobClassName())
-                    .whenComplete((context, err) -> {
-                        if (err != null) {
-                            if (context != null) {
-                                context.close();
-                            }
-
-                            sendExecuteResponse(null, err, senderConsistentId, 
correlationId);
-                        }
-
-                        doExecuteLocally(
-                                executeRequest.executeOptions(),
-                                jobClass(context.classLoader(), 
executeRequest.jobClassName()),
-                                executeRequest.args()
-                        ).whenComplete((r, e) -> context.close())
-                                .handle((result, ex) -> 
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
-                    });
+            CompletableFuture<R> future = 
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), 
jobClassName)
+                    .thenCompose(context -> this.<R>exec(context, options, 
jobClassName, args)
+                            .whenComplete((r, e) -> context.close()));
+
+            inFlightFutures.registerFuture(future);
+
+            return future;
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    @Nullable
-    private Object sendExecuteResponse(@Nullable Object result, @Nullable 
Throwable ex, String senderConsistentId, Long correlationId) {
-        ExecuteResponse executeResponse = messagesFactory.executeResponse()
-                .result(result)
-                .throwable(ex)
-                .build();
-
-        messagingService.respond(senderConsistentId, executeResponse, 
correlationId);
-
-        return null;
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        executor.start();
+        messaging.start(this::start);
     }
 
-    private CompletableFuture<JobContext> jobClassLoader(List<DeploymentUnit> 
units) {
-        return jobContextManager.acquireClassLoader(units);
-    }
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
 
-    private DeploymentUnitMsg toDeploymentUnitMsg(DeploymentUnit unit) {
-        return messagesFactory.deploymentUnitMsg()
-                .name(unit.name())
-                .version(unit.version().toString())
-                .build();
-    }
+        busyLock.block();
+        inFlightFutures.cancelInFlightFutures();
 
-    private static List<DeploymentUnit> 
toDeploymentUnit(List<DeploymentUnitMsg> unitMsgs) {
-        return unitMsgs.stream()
-                .map(it -> new DeploymentUnit(it.name(), 
Version.parseVersion(it.version())))
-                .collect(Collectors.toList());
+        messaging.stop();
+        executor.stop();
     }
 
-    private static <R> CompletableFuture<R> 
resultFromExecuteResponse(ExecuteResponse executeResponse) {
-        Throwable throwable = executeResponse.throwable();
-        if (throwable != null) {
-            return failedFuture(throwable);
-        }
-
-        return completedFuture((R) executeResponse.result());
+    private <R> CompletableFuture<R> exec(JobContext context, ExecutionOptions 
options, String jobClassName, Object[] args) {
+        return executor.executeJob(
+                options,
+                ComputeUtils.jobClass(context.classLoader(), jobClassName),
+                args
+        );
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 59c0eb71f8..9b7368a9c3 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -17,10 +17,19 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
 
 import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteException;
 
@@ -28,6 +37,8 @@ import org.apache.ignite.lang.IgniteException;
  * Utility class for compute.
  */
 public class ComputeUtils {
+    private static final ComputeMessagesFactory MESSAGES_FACTORY = new 
ComputeMessagesFactory();
+
     /**
      * Instantiate compute job via provided class loader by provided job class 
name.
      *
@@ -79,4 +90,45 @@ public class ComputeUtils {
             );
         }
     }
+
+    /**
+     * Transform deployment unit object to message {@link DeploymentUnitMsg}.
+     *
+     * @param unit Deployment unit.
+     * @return Deployment unit message.
+     */
+    public static DeploymentUnitMsg toDeploymentUnitMsg(DeploymentUnit unit) {
+        return MESSAGES_FACTORY.deploymentUnitMsg()
+                .name(unit.name())
+                .version(unit.version().toString())
+                .build();
+    }
+
+    /**
+     * Extract Compute job result from execute response.
+     *
+     * @param executeResponse Execution message response.
+     * @param <R> Compute job return type.
+     * @return Completable future with result.
+     */
+    public static <R> CompletableFuture<R> 
resultFromExecuteResponse(ExecuteResponse executeResponse) {
+        Throwable throwable = executeResponse.throwable();
+        if (throwable != null) {
+            return failedFuture(throwable);
+        }
+
+        return completedFuture((R) executeResponse.result());
+    }
+
+    /**
+     * Transform list of deployment unit messages to list of deployment units.
+     *
+     * @param unitMsgs Deployment units messages.
+     * @return Deployment units.
+     */
+    public static List<DeploymentUnit> 
toDeploymentUnit(List<DeploymentUnitMsg> unitMsgs) {
+        return unitMsgs.stream()
+                .map(it -> new DeploymentUnit(it.name(), 
Version.parseVersion(it.version())))
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
similarity index 55%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
index 7d6ad4c645..4f01d9bda8 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
@@ -15,19 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.compute.DeploymentUnit;
 
 /**
- * Executor of Compute jobs.
+ * Compute job starter interface.
  */
-public interface ComputeExecutor {
-    <R> CompletableFuture<R> executeJob(ExecutionOptions options, 
Class<ComputeJob<R>> jobClass, Object[] args);
-
-    void start();
-
-    void stop();
+public interface JobStarter {
+    /**
+     * Start compute job.
+     *
+     * @param options Compute job execution options.
+     * @param units Deployment units. Can be empty.
+     * @param jobClassName Name of the job class to execute.
+     * @param args Arguments of the job.
+     * @param <R> Job result type.
+     * @return CompletableFuture Job result.
+     */
+    <R> CompletableFuture<R> start(
+            ExecutionOptions options,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            Object... args
+    );
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
index 525a56be9f..5237c211e8 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute.configuration;
 
 import static java.lang.Math.max;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.annotation.Value;
@@ -46,4 +47,11 @@ public class ComputeConfigurationSchema {
     @Range(min = 1)
     @Value(hasDefault = true)
     public final int queueMaxSize = Integer.MAX_VALUE;
+
+    /**
+     * The lifetime of job states in milliseconds after the Compute job 
finishes.
+     */
+    @Range(min = 0)
+    @Value(hasDefault = true)
+    public final long statesLifetimeMillis = TimeUnit.MINUTES.toMillis(1);
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
similarity index 95%
rename from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
rename to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index 7d6ad4c645..91f9b8fe01 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.executor;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
similarity index 71%
rename from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
rename to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 40fdacfb4a..99966fac09 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.executor;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.compute.ComputeUtils;
 import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
+import org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -39,15 +41,33 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
 
     private final ComputeConfiguration configuration;
 
+    private final ComputeStateMachine stateMachine;
+
     private PriorityQueueExecutor executorService;
 
-    public ComputeExecutorImpl(Ignite ignite, ComputeConfiguration 
configuration) {
+    /**
+     * Constructor.
+     *
+     * @param ignite Ignite instance for public API access.
+     * @param stateMachine Compute jobs state machine.
+     * @param configuration Compute configuration.
+     */
+    public ComputeExecutorImpl(
+            Ignite ignite,
+            ComputeStateMachine stateMachine,
+            ComputeConfiguration configuration
+    ) {
         this.ignite = ignite;
         this.configuration = configuration;
+        this.stateMachine = stateMachine;
     }
 
     @Override
-    public <R> CompletableFuture<R> executeJob(ExecutionOptions options, 
Class<ComputeJob<R>> jobClass, Object[] args) {
+    public <R> CompletableFuture<R> executeJob(
+            ExecutionOptions options,
+            Class<ComputeJob<R>> jobClass,
+            Object[] args
+    ) {
         assert executorService != null;
 
         JobExecutionContext context = new JobExecutionContextImpl(ignite);
@@ -57,14 +77,17 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
 
     @Override
     public void start() {
+        stateMachine.start();
         executorService = new PriorityQueueExecutor(
                 configuration,
-                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"), 
LOG)
+                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"), 
LOG),
+                stateMachine
         );
     }
 
     @Override
     public void stop() {
+        stateMachine.stop();
         executorService.shutdown();
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
new file mode 100644
index 0000000000..66141f7d0d
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.messaging;
+
+import static 
org.apache.ignite.internal.compute.ComputeUtils.resultFromExecuteResponse;
+import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.internal.compute.ComputeMessagesFactory;
+import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.JobStarter;
+import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compute API internal messaging service.
+ */
+public class ComputeMessaging {
+    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final MessagingService messagingService;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    public ComputeMessaging(MessagingService messagingService) {
+        this.messagingService = messagingService;
+    }
+
+    /**
+     * Start messaging service.
+     *
+     * @param starter Compute job starter.
+     */
+    public void start(JobStarter starter) {
+        messagingService.addMessageHandler(ComputeMessageTypes.class, 
(message, senderConsistentId, correlationId) -> {
+            assert correlationId != null;
+
+            if (!busyLock.enterBusy()) {
+                sendExecuteResponse(
+                        null,
+                        new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException()),
+                        senderConsistentId,
+                        correlationId
+                );
+                return;
+            }
+
+            try {
+                if (message instanceof ExecuteRequest) {
+                    processExecuteRequest(starter, (ExecuteRequest) message, 
senderConsistentId, correlationId);
+                }
+            } finally {
+                busyLock.leaveBusy();
+            }
+        });
+    }
+
+    /**
+     * Stop messaging service. After stop this service is not usable anymore.
+     */
+    public void stop() {
+        busyLock.block();
+    }
+
+    /**
+     * Submit Compute job to execution on remote node.
+     *
+     * @param options Job execution options.
+     * @param remoteNode The job will be executed on this node.
+     * @param units Deployment units. Can be empty.
+     * @param jobClassName Name of the job class to execute.
+     * @param args Arguments of the job.
+     * @param <R> Job result type
+     * @return Job result.
+     */
+    public <R> CompletableFuture<R> remoteExecuteRequest(
+            ExecutionOptions options,
+            ClusterNode remoteNode,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            Object[] args
+    ) {
+        List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
+                .map(ComputeUtils::toDeploymentUnitMsg)
+                .collect(Collectors.toList());
+
+        ExecuteRequest executeRequest = messagesFactory.executeRequest()
+                .executeOptions(options)
+                .deploymentUnits(deploymentUnitMsgs)
+                .jobClassName(jobClassName)
+                .args(args)
+                .build();
+
+        return messagingService.invoke(remoteNode, executeRequest, 
NETWORK_TIMEOUT_MILLIS)
+                .thenCompose(message -> 
resultFromExecuteResponse((ExecuteResponse) message));
+    }
+
+    private void processExecuteRequest(JobStarter starter, ExecuteRequest 
executeRequest, String senderConsistentId, long correlationId) {
+        List<DeploymentUnit> units = 
toDeploymentUnit(executeRequest.deploymentUnits());
+
+        starter.start(executeRequest.executeOptions(), units, 
executeRequest.jobClassName(), executeRequest.args())
+                .whenComplete((result, err) ->
+                        sendExecuteResponse(result, err, senderConsistentId, 
correlationId));
+    }
+
+    private void sendExecuteResponse(@Nullable Object result, @Nullable 
Throwable ex, String senderConsistentId, Long correlationId) {
+        ExecuteResponse executeResponse = messagesFactory.executeResponse()
+                .result(result)
+                .throwable(ex)
+                .build();
+
+        messagingService.respond(senderConsistentId, executeResponse, 
correlationId);
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index db5dff110d..c8f8568c7e 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -21,12 +21,14 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
 
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 
@@ -40,6 +42,8 @@ public class PriorityQueueExecutor {
 
     private final ThreadPoolExecutor executor;
 
+    private final ComputeStateMachine stateMachine;
+
     /**
      * Constructor.
      *
@@ -48,9 +52,11 @@ public class PriorityQueueExecutor {
      */
     public PriorityQueueExecutor(
             ComputeConfiguration configuration,
-            ThreadFactory threadFactory
+            ThreadFactory threadFactory,
+            ComputeStateMachine stateMachine
     ) {
         this.configuration = configuration;
+        this.stateMachine = stateMachine;
         executor = new ThreadPoolExecutor(
                 configuration.threadPoolSize().value(),
                 configuration.threadPoolSize().value(),
@@ -72,13 +78,25 @@ public class PriorityQueueExecutor {
     public <R> CompletableFuture<R> submit(Callable<R> job, int priority) {
         Objects.requireNonNull(job);
 
-        QueueEntry<R> queueEntry = new QueueEntry<>(job, priority);
+        UUID jobId = stateMachine.initJob();
+        QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
+            stateMachine.executeJob(jobId);
+            return job.call();
+        }, priority);
+
         try {
             executor.execute(queueEntry);
         } catch (QueueOverflowException e) {
             return failedFuture(new IgniteException(QUEUE_OVERFLOW_ERR, e));
         }
-        return queueEntry.toFuture();
+        return queueEntry.toFuture()
+                .whenComplete((r, throwable) -> {
+                    if (throwable != null) {
+                        stateMachine.failJob(jobId);
+                    } else {
+                        stateMachine.completeJob(jobId);
+                    }
+                });
     }
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index 7753fe42ca..dcf0764ce5 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -35,7 +35,7 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
 
     private final CompletableFuture<R> future = new CompletableFuture<>();
 
-    private final Callable<R> job;
+    private final Callable<R> jobAction;
 
     private final int priority;
 
@@ -44,11 +44,11 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
     /**
      * Constructor.
      *
-     * @param job Compute job callable.
+     * @param jobAction Compute job callable.
      * @param priority Job priority.
      */
-    public QueueEntry(Callable<R> job, int priority) {
-        this.job = job;
+    public QueueEntry(Callable<R> jobAction, int priority) {
+        this.jobAction = jobAction;
         this.priority = priority;
         seqNum = seq.getAndIncrement();
     }
@@ -56,7 +56,7 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
     @Override
     public void run() {
         try {
-            future.complete(job.call());
+            future.complete(jobAction.call());
         } catch (Exception e) {
             future.completeExceptionally(e);
         }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
new file mode 100644
index 0000000000..f4b1666ddf
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+
+/**
+ * State machine of Compute Jobs.
+ */
+public interface ComputeStateMachine {
+    /**
+     * Start Compute jobs state machine. The instance can't be used before it 
is started.
+     */
+    void start();
+
+    /**
+     * Stop Compute jobs state machine. The instance can't be used after it is 
stopped.
+     */
+    void stop();
+
+    /**
+     * Initialize Compute job in state machine. This job should have status 
{@link JobState#QUEUED}.
+     *
+     * @return Compute job identifier.
+     */
+    UUID initJob();
+
+    /**
+     * Tries to transfer Compute Job to complete state.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to complete state.
+     */
+    void completeJob(UUID jobId);
+
+    /**
+     * Tries to transfer Compute Job to execute state.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to execute state.
+     */
+    void executeJob(UUID jobId);
+
+    /**
+     * Tries to transfer Compute Job to canceling state, it means that 
execution may continue.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to canceling state.
+     */
+    void cancelingJob(UUID jobId);
+
+    /**
+     * Tries to transfer Compute Job to cancel state, it means that execution 
canceled.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to canceled state.
+     */
+    void cancelJob(UUID jobId);
+
+    /**
+     * Tries to transfer Compute Job to fail state.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to failed state.
+     */
+    void failJob(UUID jobId);
+
+    /**
+     * Returns current state of Compute Job.
+     *
+     * @param jobId Compute job identifier.
+     * @return Current state of Compute Job or {@code null} in case if job 
with provided identifier doesn't exist.
+     */
+    JobState currentState(UUID jobId);
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
new file mode 100644
index 0000000000..002bcfcff0
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_STATE_TRANSITION_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Thrown from Compute Jobs state machine {@link ComputeStateMachine} when job 
state transfer is illegal.
+ */
+public class IllegalJobStateTransition extends IgniteInternalException {
+    public IllegalJobStateTransition(UUID jobId) {
+        super(COMPUTE_JOB_STATE_TRANSITION_ERR, "Failed to transfer job state 
for nonexistent job " + jobId + ".");
+    }
+
+    public IllegalJobStateTransition(UUID jobId, JobState prevState, JobState 
newState) {
+        super(COMPUTE_JOB_STATE_TRANSITION_ERR, message(jobId, prevState, 
newState));
+    }
+
+    private static String message(UUID jobId, JobState prevState, JobState 
newState) {
+        return "Failed to transfer job " + jobId
+                + " from state " + prevState
+                + " to state " + newState;
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
new file mode 100644
index 0000000000..b16cdeea5e
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.compute.JobState.QUEUED;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * In memory implementation of {@link ComputeStateMachine}.
+ */
+public class InMemoryComputeStateMachine implements ComputeStateMachine {
+    private static final IgniteLogger LOG = 
Loggers.forClass(InMemoryComputeStateMachine.class);
+
+    private final ComputeConfiguration configuration;
+
+    private ExecutorService cleaner;
+
+    private final Set<UUID> toRemove = new HashSet<>();
+
+    private final Set<UUID> waitToRemove = ConcurrentHashMap.newKeySet();
+
+    private final Map<UUID, JobState> states = new ConcurrentHashMap<>();
+
+    public InMemoryComputeStateMachine(ComputeConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void start() {
+        Long lifetime = configuration.statesLifetimeMillis().value();
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+                new NamedThreadFactory("InMemoryComputeStateMachine-pool", LOG)
+        );
+        executor.scheduleAtFixedRate(() -> {
+            Set<UUID> nextToRemove = Set.of(waitToRemove.toArray(UUID[]::new));
+            this.waitToRemove.removeAll(nextToRemove);
+
+            for (UUID jobId : toRemove) {
+                states.remove(jobId);
+            }
+            toRemove.clear();
+            toRemove.addAll(nextToRemove);
+        }, lifetime, lifetime, TimeUnit.MILLISECONDS);
+        cleaner = executor;
+    }
+
+    @Override
+    public void stop() {
+        shutdownAndAwaitTermination(cleaner, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public JobState currentState(UUID jobId) {
+        return states.get(jobId);
+    }
+
+    @Override
+    public UUID initJob() {
+        UUID uuid = UUID.randomUUID();
+        JobState prevValue = states.putIfAbsent(uuid, QUEUED);
+        if (prevValue != null) {
+            LOG.info("UUID collision detected! UUID: {}", uuid);
+            return initJob();
+        }
+
+        return uuid;
+    }
+
+    @Override
+    public void executeJob(UUID jobId) {
+        changeState(jobId, EXECUTING, QUEUED);
+    }
+
+    @Override
+    public void failJob(UUID jobId) {
+        changeState(jobId, FAILED, EXECUTING, CANCELING);
+        waitToRemove.add(jobId);
+    }
+
+    @Override
+    public void completeJob(UUID jobId) {
+        changeState(jobId, COMPLETED, EXECUTING, CANCELING);
+        waitToRemove.add(jobId);
+    }
+
+    @Override
+    public void cancelingJob(UUID jobId) {
+        changeState(jobId, currentState -> {
+            if (currentState == QUEUED) {
+                waitToRemove.add(jobId);
+                return CANCELED;
+            } else if (currentState == EXECUTING) {
+                return CANCELING;
+            }
+
+            throw new IllegalJobStateTransition(jobId, currentState, 
CANCELING);
+        });
+    }
+
+    @Override
+    public void cancelJob(UUID jobId) {
+        changeState(jobId, CANCELED, QUEUED, CANCELING);
+        waitToRemove.add(jobId);
+    }
+
+    private void changeState(UUID jobId, JobState newState, JobState... 
requiredStates) {
+        changeState(jobId, currentState -> {
+            for (JobState requiredState : requiredStates) {
+                if (currentState == requiredState) {
+                    return newState;
+                }
+            }
+
+            throw new IllegalJobStateTransition(jobId, currentState, newState);
+        });
+    }
+
+    private void changeState(
+            UUID jobId,
+            Function<JobState, JobState> newStateFunction
+    ) {
+        JobState prevValue = states.computeIfPresent(jobId,
+                (uuid, currentState) -> newStateFunction.apply(currentState)
+        );
+
+        if (prevValue == null) {
+            throw new IllegalJobStateTransition(jobId);
+        }
+    }
+
+}
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 5ad714c539..120c7c7a06 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.compute;
 
 import static org.apache.ignite.internal.compute.ExecutionOptions.DEFAULT;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
@@ -57,22 +60,23 @@ import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.executor.ComputeExecutor;
+import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl;
 import org.apache.ignite.internal.compute.loader.JobClassLoader;
 import org.apache.ignite.internal.compute.loader.JobContext;
 import org.apache.ignite.internal.compute.loader.JobContextManager;
 import org.apache.ignite.internal.compute.message.ExecuteRequest;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.internal.compute.queue.ComputeExecutor;
-import org.apache.ignite.internal.compute.queue.ComputeExecutorImpl;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitUnavailableException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
@@ -143,8 +147,8 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 willCompleteSuccessfully()
         );
 
+        computeExecutor = new ComputeExecutorImpl(ignite, new 
InMemoryComputeStateMachine(computeConfiguration), computeConfiguration);
 
-        computeExecutor = new ComputeExecutorImpl(ignite, 
computeConfiguration);
         computeComponent = new ComputeComponentImpl(messagingService, 
jobContextManager, computeExecutor);
 
         computeComponent.start();
@@ -156,10 +160,10 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void executesLocally() throws Exception {
-        String result = computeComponent.<String>executeLocally(List.of(), 
SimpleJob.class.getName(), "a", 42).get();
+    void executesLocally() {
+        CompletableFuture<String> result = 
computeComponent.executeLocally(List.of(), SimpleJob.class.getName(), "a", 42);
 
-        assertThat(result, is("jobResponse"));
+        assertThat(result, willBe("jobResponse"));
 
         assertThatExecuteRequestWasNotSent();
     }
@@ -266,11 +270,9 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws 
Exception {
         computeComponent.stop();
 
-        Object result = computeComponent.executeLocally(List.of(), 
SimpleJob.class.getName())
-                .handle((s, ex) -> ex != null ? ex : s)
-                .get();
+        CompletableFuture<Object> result = 
computeComponent.executeLocally(List.of(), SimpleJob.class.getName());
 
-        assertThat(result, is(instanceOf(NodeStoppingException.class)));
+        assertThat(result, 
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
     }
 
     @Test
@@ -284,11 +286,9 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws 
Exception {
         computeComponent.stop();
 
-        Object result = computeComponent.executeRemotely(remoteNode, 
List.of(), SimpleJob.class.getName())
-                .handle((s, ex) -> ex != null ? ex : s)
-                .get();
+        CompletableFuture<Object> result = 
computeComponent.executeRemotely(remoteNode, List.of(), 
SimpleJob.class.getName());
 
-        assertThat(result, is(instanceOf(NodeStoppingException.class)));
+        assertThat(result, 
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
     }
 
     @Test
@@ -327,7 +327,8 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         ExecuteResponse response = executeResponseCaptor.getValue();
 
         assertThat(response.result(), is(nullValue()));
-        assertThat(response.throwable(), 
is(instanceOf(NodeStoppingException.class)));
+        assertThat(response.throwable(), 
is(instanceOf(IgniteInternalException.class)));
+        assertThat(response.throwable().getCause(), 
is(instanceOf(NodeStoppingException.class)));
     }
 
     @Test
@@ -348,23 +349,24 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         assertThat(computeConfiguration.change(computeChange -> 
computeChange.changeThreadPoolStopTimeoutMillis(100)),
                 willCompleteSuccessfully());
 
-        computeComponent = new ComputeComponentImpl(messagingService, 
jobContextManager, computeExecutor);
+        computeComponent = new ComputeComponentImpl(
+                messagingService,
+                jobContextManager,
+                computeExecutor
+        );
         computeComponent.start();
 
         // take the only executor thread
         computeComponent.executeLocally(List.of(), LongJob.class.getName());
 
         // the corresponding task goes to work queue
-        CompletableFuture<Object> resultFuture = 
computeComponent.executeLocally(List.of(), SimpleJob.class.getName())
-                .handle((res, ex) -> ex != null ? ex : res);
+        CompletableFuture<Object> resultFuture = 
computeComponent.executeLocally(List.of(), SimpleJob.class.getName());
 
         computeComponent.stop();
 
         // now work queue is dropped to the floor, so the future should be 
resolved with a cancellation
 
-        Exception result = (Exception) resultFuture.get(3, TimeUnit.SECONDS);
-
-        assertThat(result.getCause(), 
is(instanceOf(CancellationException.class)));
+        assertThat(resultFuture, willThrow(CancellationException.class));
     }
 
     @Test
@@ -414,7 +416,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
 
         assertThat(
                 computeComponent.executeLocally(units, "com.example.Maim"),
-                
CompletableFutureExceptionMatcher.willThrow(ClassNotFoundException.class)
+                willThrow(ClassNotFoundException.class)
         );
     }
 
@@ -432,7 +434,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
 
         assertThat(
                 computeComponent.executeLocally(units, "com.example.Maim"),
-                
CompletableFutureExceptionMatcher.willThrow(ClassNotFoundException.class)
+                willThrow(ClassNotFoundException.class)
         );
     }
 
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 10c1b9c6e1..3dd1d6310c 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -65,7 +66,8 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
 
         priorityQueueExecutor = new PriorityQueueExecutor(
                 configuration,
-                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG)
+                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG),
+                new InMemoryComputeStateMachine(configuration)
         );
     }
 
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
new file mode 100644
index 0000000000..820685b1a5
--- /dev/null
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.UUID;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for {@link InMemoryComputeStateMachine}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest {
+    private ComputeStateMachine stateMachine;
+
+    @InjectConfiguration
+    private ComputeConfiguration configuration;
+
+    private UUID jobId;
+
+    @BeforeEach
+    public void setup() {
+        stateMachine = new InMemoryComputeStateMachine(configuration);
+        stateMachine.start();
+        jobId = stateMachine.initJob();
+    }
+
+    @AfterEach
+    public void clean() {
+        stateMachine.stop();
+    }
+
+    @Test
+    public void testSubmit() {
+        assertThat(jobId, is(notNullValue()));
+    }
+
+    @Test
+    public void testCompleteWay() {
+        executeJob(false);
+        completeJob(false);
+    }
+
+    @Test
+    public void testCancel() {
+        stateMachine.cancelJob(jobId);
+        assertThat(stateMachine.currentState(jobId), is(CANCELED));
+    }
+
+    @Test
+    public void testCancelFromExecuting() {
+        executeJob(false);
+        cancelingJob(false);
+        cancelJob(false);
+    }
+
+    @Test
+    public void testCompleteCanceling() {
+        executeJob(false);
+        cancelingJob(false);
+        completeJob(false);
+    }
+
+    @Test
+    public void testFailCanceling() {
+        executeJob(false);
+        cancelingJob(false);
+        failJob(false);
+    }
+
+    @Test
+    public void testFailExecuting() {
+        executeJob(false);
+        failJob(false);
+    }
+
+    @Test
+    public void testCompleteExecution() {
+        executeJob(false);
+        completeJob(false);
+    }
+
+    @Test
+    public void testDoubleExecution() {
+        executeJob(false);
+        executeJob(true);
+    }
+
+    @Test
+    public void testDoubleComplete() {
+        executeJob(false);
+
+        completeJob(false);
+        completeJob(true);
+    }
+
+    @Test
+    public void testDoubleFail() {
+        executeJob(false);
+
+        failJob(false);
+        failJob(true);
+    }
+
+    @Test
+    public void testCleanStates() throws InterruptedException {
+        assertThat(configuration.change(change -> 
change.changeStatesLifetimeMillis(100)), willCompleteSuccessfully());
+
+        stateMachine = new InMemoryComputeStateMachine(configuration);
+        stateMachine.start();
+
+        jobId = stateMachine.initJob();
+        executeJob(false);
+        completeJob(false);
+        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentState(jobId) == null, 100);
+
+        jobId = stateMachine.initJob();
+        executeJob(false);
+        failJob(false);
+        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentState(jobId) == null, 100);
+
+        jobId = stateMachine.initJob();
+        cancelJob(false);
+        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentState(jobId) == null, 100);
+
+        jobId = stateMachine.initJob();
+        executeJob(false);
+        cancelingJob(false);
+        cancelJob(false);
+        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentState(jobId) == null, 100);
+
+        stateMachine.stop();
+    }
+
+    private void cancelJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.cancelJob(jobId);
+            assertThat(stateMachine.currentState(jobId), is(CANCELED));
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.cancelJob(jobId));
+        }
+    }
+
+    private void cancelingJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.cancelingJob(jobId);
+            assertThat(stateMachine.currentState(jobId), is(CANCELING));
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.cancelJob(jobId));
+        }
+    }
+
+    private void executeJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.executeJob(jobId);
+            assertThat(stateMachine.currentState(jobId), is(EXECUTING));
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.executeJob(jobId));
+        }
+    }
+
+    private void completeJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.completeJob(jobId);
+            assertThat(stateMachine.currentState(jobId), is(COMPLETED));
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.completeJob(jobId));
+        }
+    }
+
+    private void failJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.failJob(jobId);
+            assertThat(stateMachine.currentState(jobId), is(FAILED));
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.failJob(jobId));
+        }
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9bd157e530..f92cf11b4a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -68,9 +68,10 @@ import org.apache.ignite.internal.compute.ComputeComponent;
 import org.apache.ignite.internal.compute.ComputeComponentImpl;
 import org.apache.ignite.internal.compute.IgniteComputeImpl;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl;
 import org.apache.ignite.internal.compute.loader.JobClassLoaderFactory;
 import org.apache.ignite.internal.compute.loader.JobContextManager;
-import org.apache.ignite.internal.compute.queue.ComputeExecutorImpl;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
 import 
org.apache.ignite.internal.configuration.ConfigurationDynamicDefaultsPatcherImpl;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -643,10 +644,12 @@ public class IgniteImpl implements Ignite {
         );
         deploymentManager = deploymentManagerImpl;
 
+        ComputeConfiguration computeCfg = 
nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY);
+        InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeCfg);
         computeComponent = new ComputeComponentImpl(
                 clusterSvc.messagingService(),
                 new JobContextManager(deploymentManagerImpl, 
deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()),
-                new ComputeExecutorImpl(this, 
nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY))
+                new ComputeExecutorImpl(this, stateMachine, computeCfg)
         );
 
         compute = new IgniteComputeImpl(clusterSvc.topologyService(), 
distributedTblMgr, computeComponent);

Reply via email to