This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7f98bd2b1a IGNITE-20843 Introduce Compute Job states. (#2894)
7f98bd2b1a is described below
commit 7f98bd2b1aa86b8047aaa58cd7d1f8f16d87fb8b
Author: Mikhail <[email protected]>
AuthorDate: Wed Dec 6 14:01:10 2023 +0300
IGNITE-20843 Introduce Compute Job states. (#2894)
---
.../org/apache/ignite/compute/IgniteCompute.java | 20 +-
.../java/org/apache/ignite/compute/JobState.java} | 40 +++-
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../internal/compute/ComputeComponentImpl.java | 186 +++++-------------
.../ignite/internal/compute/ComputeUtils.java | 52 +++++
.../ComputeExecutor.java => JobStarter.java} | 31 ++-
.../configuration/ComputeConfigurationSchema.java | 8 +
.../{queue => executor}/ComputeExecutor.java | 2 +-
.../{queue => executor}/ComputeExecutorImpl.java | 31 ++-
.../compute/messaging/ComputeMessaging.java | 144 ++++++++++++++
.../compute/queue/PriorityQueueExecutor.java | 24 ++-
.../ignite/internal/compute/queue/QueueEntry.java | 10 +-
.../compute/state/ComputeStateMachine.java | 91 +++++++++
.../compute/state/IllegalJobStateTransition.java | 43 +++++
.../compute/state/InMemoryComputeStateMachine.java | 167 ++++++++++++++++
.../internal/compute/ComputeComponentImplTest.java | 50 ++---
.../compute/queue/PriorityQueueExecutorTest.java | 4 +-
.../state/InMemoryComputeStateMachineTest.java | 212 +++++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
19 files changed, 912 insertions(+), 213 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 085c6de970..f52ee20122 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -35,11 +35,11 @@ public interface IgniteCompute {
/**
* Executes a {@link ComputeJob} of the given class on a single node from
a set of candidate nodes.
*
- * @param nodes Candidate nodes; the job will be executed on one of
them.
- * @param units Deployment units. Can be empty.
+ * @param nodes Candidate nodes; the job will be executed on one of them.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param args Arguments of the job.
- * @param <R> Job result type
+ * @param args Arguments of the job.
+ * @param <R> Job result type.
* @return CompletableFuture Job result.
*/
<R> CompletableFuture<R> executeAsync(
@@ -52,11 +52,11 @@ public interface IgniteCompute {
/**
* Executes a {@link ComputeJob} of the given class on a single node from
a set of candidate nodes.
*
- * @param nodes Candidate nodes; the job will be executed on one of
them.
- * @param units Deployment units. Can be empty.
+ * @param nodes Candidate nodes; the job will be executed on one of them.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param args Arguments of the job.
- * @param <R> Job result type
+ * @param args Arguments of the job.
+ * @param <R> Job result type
* @return Job result.
*/
<R> R execute(
@@ -153,8 +153,8 @@ public interface IgniteCompute {
* @param nodes Nodes to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param args Arguments of the job.
- * @param <R> Job result type.
+ * @param args Arguments of the job.
+ * @param <R> Job result type.
* @return Map from node to job result future.
*/
<R> Map<ClusterNode, CompletableFuture<R>> broadcastAsync(
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
similarity index 56%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
copy to modules/api/src/main/java/org/apache/ignite/compute/JobState.java
index 7d6ad4c645..c89a3f866a 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
@@ -15,19 +15,39 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.queue;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.internal.compute.ExecutionOptions;
+package org.apache.ignite.compute;
/**
- * Executor of Compute jobs.
+ * Compute job's state enum.
*/
-public interface ComputeExecutor {
- <R> CompletableFuture<R> executeJob(ExecutionOptions options,
Class<ComputeJob<R>> jobClass, Object[] args);
+public enum JobState {
+ /**
+ * The job is submitted and waiting for an execution start.
+ */
+ QUEUED,
+
+ /**
+ * The job is being executed.
+ */
+ EXECUTING,
+
+ /**
+ * The job was unexpectedly terminated during execution.
+ */
+ FAILED,
+
+ /**
+ * The job was executed successfully and the execution result was returned.
+ */
+ COMPLETED,
- void start();
+ /**
+ * The job has received the cancel command, but it is still running.
+ */
+ CANCELING,
- void stop();
+ /**
+ * The job was successfully cancelled.
+ */
+ CANCELED
}
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d8721a12f6..d3623b2131 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -516,6 +516,9 @@ public class ErrorGroups {
/** Compute execution queue overflow error. */
public static final int QUEUE_OVERFLOW_ERR =
COMPUTE_ERR_GROUP.registerErrorCode((short) 4);
+
+ /** Compute job state transfer error. */
+ public static final int COMPUTE_JOB_STATE_TRANSITION_ERR =
COMPUTE_ERR_GROUP.registerErrorCode((short) 5);
}
/** Catalog error group. */
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index e4f7c62133..af211cbd61 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -17,39 +17,29 @@
package org.apache.ignite.internal.compute;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
-import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
-import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.compute.executor.ComputeExecutor;
import org.apache.ignite.internal.compute.loader.JobContext;
import org.apache.ignite.internal.compute.loader.JobContextManager;
-import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.internal.compute.queue.ComputeExecutor;
+import org.apache.ignite.internal.compute.messaging.ComputeMessaging;
import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
-import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link ComputeComponent}.
*/
public class ComputeComponentImpl implements ComputeComponent {
- private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
-
- private final ComputeMessagesFactory messagesFactory = new
ComputeMessagesFactory();
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -58,12 +48,12 @@ public class ComputeComponentImpl implements
ComputeComponent {
private final InFlightFutures inFlightFutures = new InFlightFutures();
- private final MessagingService messagingService;
-
private final JobContextManager jobContextManager;
private final ComputeExecutor executor;
+ private final ComputeMessaging messaging;
+
/**
* Creates a new instance.
*/
@@ -72,37 +62,9 @@ public class ComputeComponentImpl implements
ComputeComponent {
JobContextManager jobContextManager,
ComputeExecutor executor
) {
- this.messagingService = messagingService;
this.jobContextManager = jobContextManager;
this.executor = executor;
- }
-
- /** {@inheritDoc} */
- @Override
- public void start() {
- executor.start();
-
- messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
- assert correlationId != null;
-
- if (message instanceof ExecuteRequest) {
- processExecuteRequest((ExecuteRequest) message,
senderConsistentId, correlationId);
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override
- public void stop() throws Exception {
- if (!stopGuard.compareAndSet(false, true)) {
- return;
- }
-
- busyLock.block();
-
- executor.stop();
-
- inFlightFutures.cancelInFlightFutures();
+ messaging = new ComputeMessaging(messagingService);
}
/** {@inheritDoc} */
@@ -113,30 +75,7 @@ public class ComputeComponentImpl implements
ComputeComponent {
String jobClassName,
Object... args
) {
- if (!busyLock.enterBusy()) {
- return failedFuture(new NodeStoppingException());
- }
-
- try {
- return mapClassLoaderExceptions(jobClassLoader(units),
jobClassName)
- .thenCompose(context ->
- doExecuteLocally(options,
ComputeUtils.<R>jobClass(context.classLoader(), jobClassName), args)
- .whenComplete((r, e) -> context.close())
- );
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- private <R> CompletableFuture<R> doExecuteLocally(
- ExecutionOptions options,
- Class<ComputeJob<R>> jobClass,
- Object[] args
- ) {
- CompletableFuture<R> future = executor.executeJob(options, jobClass,
args);
- inFlightFutures.registerFuture(future);
-
- return future;
+ return start(options, units, jobClassName, args);
}
/** {@inheritDoc} */
@@ -149,106 +88,67 @@ public class ComputeComponentImpl implements
ComputeComponent {
Object... args
) {
if (!busyLock.enterBusy()) {
- return failedFuture(new NodeStoppingException());
+ return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- return doExecuteRemotely(options, remoteNode, units, jobClassName,
args);
+ CompletableFuture<R> future =
messaging.remoteExecuteRequest(options, remoteNode, units, jobClassName, args);
+ inFlightFutures.registerFuture(future);
+ return future;
} finally {
busyLock.leaveBusy();
}
}
- private <R> CompletableFuture<R> doExecuteRemotely(
+ private <R> CompletableFuture<R> start(
ExecutionOptions options,
- ClusterNode remoteNode,
List<DeploymentUnit> units,
String jobClassName,
- Object[] args
+ Object... args
) {
- List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
- .map(this::toDeploymentUnitMsg)
- .collect(Collectors.toList());
-
- ExecuteRequest executeRequest = messagesFactory.executeRequest()
- .executeOptions(options)
- .deploymentUnits(deploymentUnitMsgs)
- .jobClassName(jobClassName)
- .args(args)
- .build();
-
- CompletableFuture<R> future = messagingService.invoke(remoteNode,
executeRequest, NETWORK_TIMEOUT_MILLIS)
- .thenCompose(message ->
resultFromExecuteResponse((ExecuteResponse) message));
- inFlightFutures.registerFuture(future);
- return future;
- }
-
- private void processExecuteRequest(ExecuteRequest executeRequest, String
senderConsistentId, long correlationId) {
if (!busyLock.enterBusy()) {
- sendExecuteResponse(null, new NodeStoppingException(),
senderConsistentId, correlationId);
- return;
+ return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- List<DeploymentUnit> units =
toDeploymentUnit(executeRequest.deploymentUnits());
-
- mapClassLoaderExceptions(jobClassLoader(units),
executeRequest.jobClassName())
- .whenComplete((context, err) -> {
- if (err != null) {
- if (context != null) {
- context.close();
- }
-
- sendExecuteResponse(null, err, senderConsistentId,
correlationId);
- }
-
- doExecuteLocally(
- executeRequest.executeOptions(),
- jobClass(context.classLoader(),
executeRequest.jobClassName()),
- executeRequest.args()
- ).whenComplete((r, e) -> context.close())
- .handle((result, ex) ->
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
- });
+ CompletableFuture<R> future =
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units),
jobClassName)
+ .thenCompose(context -> this.<R>exec(context, options,
jobClassName, args)
+ .whenComplete((r, e) -> context.close()));
+
+ inFlightFutures.registerFuture(future);
+
+ return future;
} finally {
busyLock.leaveBusy();
}
}
- @Nullable
- private Object sendExecuteResponse(@Nullable Object result, @Nullable
Throwable ex, String senderConsistentId, Long correlationId) {
- ExecuteResponse executeResponse = messagesFactory.executeResponse()
- .result(result)
- .throwable(ex)
- .build();
-
- messagingService.respond(senderConsistentId, executeResponse,
correlationId);
-
- return null;
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ executor.start();
+ messaging.start(this::start);
}
- private CompletableFuture<JobContext> jobClassLoader(List<DeploymentUnit>
units) {
- return jobContextManager.acquireClassLoader(units);
- }
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
- private DeploymentUnitMsg toDeploymentUnitMsg(DeploymentUnit unit) {
- return messagesFactory.deploymentUnitMsg()
- .name(unit.name())
- .version(unit.version().toString())
- .build();
- }
+ busyLock.block();
+ inFlightFutures.cancelInFlightFutures();
- private static List<DeploymentUnit>
toDeploymentUnit(List<DeploymentUnitMsg> unitMsgs) {
- return unitMsgs.stream()
- .map(it -> new DeploymentUnit(it.name(),
Version.parseVersion(it.version())))
- .collect(Collectors.toList());
+ messaging.stop();
+ executor.stop();
}
- private static <R> CompletableFuture<R>
resultFromExecuteResponse(ExecuteResponse executeResponse) {
- Throwable throwable = executeResponse.throwable();
- if (throwable != null) {
- return failedFuture(throwable);
- }
-
- return completedFuture((R) executeResponse.result());
+ private <R> CompletableFuture<R> exec(JobContext context, ExecutionOptions
options, String jobClassName, Object[] args) {
+ return executor.executeJob(
+ options,
+ ComputeUtils.jobClass(context.classLoader(), jobClassName),
+ args
+ );
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 59c0eb71f8..9b7368a9c3 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -17,10 +17,19 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteException;
@@ -28,6 +37,8 @@ import org.apache.ignite.lang.IgniteException;
* Utility class for compute.
*/
public class ComputeUtils {
+ private static final ComputeMessagesFactory MESSAGES_FACTORY = new
ComputeMessagesFactory();
+
/**
* Instantiate compute job via provided class loader by provided job class
name.
*
@@ -79,4 +90,45 @@ public class ComputeUtils {
);
}
}
+
+ /**
+ * Transform deployment unit object to message {@link DeploymentUnitMsg}.
+ *
+ * @param unit Deployment unit.
+ * @return Deployment unit message.
+ */
+ public static DeploymentUnitMsg toDeploymentUnitMsg(DeploymentUnit unit) {
+ return MESSAGES_FACTORY.deploymentUnitMsg()
+ .name(unit.name())
+ .version(unit.version().toString())
+ .build();
+ }
+
+ /**
+ * Extract Compute job result from execute response.
+ *
+ * @param executeResponse Execution message response.
+ * @param <R> Compute job return type.
+ * @return Completable future with result.
+ */
+ public static <R> CompletableFuture<R>
resultFromExecuteResponse(ExecuteResponse executeResponse) {
+ Throwable throwable = executeResponse.throwable();
+ if (throwable != null) {
+ return failedFuture(throwable);
+ }
+
+ return completedFuture((R) executeResponse.result());
+ }
+
+ /**
+ * Transform list of deployment unit messages to list of deployment units.
+ *
+ * @param unitMsgs Deployment units messages.
+ * @return Deployment units.
+ */
+ public static List<DeploymentUnit>
toDeploymentUnit(List<DeploymentUnitMsg> unitMsgs) {
+ return unitMsgs.stream()
+ .map(it -> new DeploymentUnit(it.name(),
Version.parseVersion(it.version())))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
similarity index 55%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
index 7d6ad4c645..4f01d9bda8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
@@ -15,19 +15,30 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.compute.DeploymentUnit;
/**
- * Executor of Compute jobs.
+ * Compute job starter interface.
*/
-public interface ComputeExecutor {
- <R> CompletableFuture<R> executeJob(ExecutionOptions options,
Class<ComputeJob<R>> jobClass, Object[] args);
-
- void start();
-
- void stop();
+public interface JobStarter {
+ /**
+ * Start compute job.
+ *
+ * @param options Compute job execution options.
+ * @param units Deployment units. Can be empty.
+ * @param jobClassName Name of the job class to execute.
+ * @param args Arguments of the job.
+ * @param <R> Job result type.
+ * @return CompletableFuture Job result.
+ */
+ <R> CompletableFuture<R> start(
+ ExecutionOptions options,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object... args
+ );
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
index 525a56be9f..5237c211e8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/configuration/ComputeConfigurationSchema.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute.configuration;
import static java.lang.Math.max;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
@@ -46,4 +47,11 @@ public class ComputeConfigurationSchema {
@Range(min = 1)
@Value(hasDefault = true)
public final int queueMaxSize = Integer.MAX_VALUE;
+
+ /**
+ * The lifetime of job states in milliseconds after the Compute job
finishes.
+ */
+ @Range(min = 0)
+ @Value(hasDefault = true)
+ public final long statesLifetimeMillis = TimeUnit.MINUTES.toMillis(1);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
similarity index 95%
rename from
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
rename to
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index 7d6ad4c645..91f9b8fe01 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.executor;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
similarity index 71%
rename from
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
rename to
modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 40fdacfb4a..99966fac09 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.queue;
+package org.apache.ignite.internal.compute.executor;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
+import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -39,15 +41,33 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
private final ComputeConfiguration configuration;
+ private final ComputeStateMachine stateMachine;
+
private PriorityQueueExecutor executorService;
- public ComputeExecutorImpl(Ignite ignite, ComputeConfiguration
configuration) {
+ /**
+ * Constructor.
+ *
+ * @param ignite Ignite instance for public API access.
+ * @param stateMachine Compute jobs state machine.
+ * @param configuration Compute configuration.
+ */
+ public ComputeExecutorImpl(
+ Ignite ignite,
+ ComputeStateMachine stateMachine,
+ ComputeConfiguration configuration
+ ) {
this.ignite = ignite;
this.configuration = configuration;
+ this.stateMachine = stateMachine;
}
@Override
- public <R> CompletableFuture<R> executeJob(ExecutionOptions options,
Class<ComputeJob<R>> jobClass, Object[] args) {
+ public <R> CompletableFuture<R> executeJob(
+ ExecutionOptions options,
+ Class<ComputeJob<R>> jobClass,
+ Object[] args
+ ) {
assert executorService != null;
JobExecutionContext context = new JobExecutionContextImpl(ignite);
@@ -57,14 +77,17 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
@Override
public void start() {
+ stateMachine.start();
executorService = new PriorityQueueExecutor(
configuration,
- new
NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"),
LOG)
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(ignite.name(), "compute"),
LOG),
+ stateMachine
);
}
@Override
public void stop() {
+ stateMachine.stop();
executorService.shutdown();
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
new file mode 100644
index 0000000000..66141f7d0d
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.messaging;
+
+import static
org.apache.ignite.internal.compute.ComputeUtils.resultFromExecuteResponse;
+import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.internal.compute.ComputeMessagesFactory;
+import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.JobStarter;
+import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
+import org.apache.ignite.internal.compute.message.ExecuteRequest;
+import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compute API internal messaging service.
+ */
+public class ComputeMessaging {
+ private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
+
+ private final ComputeMessagesFactory messagesFactory = new
ComputeMessagesFactory();
+
+ private final MessagingService messagingService;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ public ComputeMessaging(MessagingService messagingService) {
+ this.messagingService = messagingService;
+ }
+
+ /**
+ * Start messaging service.
+ *
+ * @param starter Compute job starter.
+ */
+ public void start(JobStarter starter) {
+ messagingService.addMessageHandler(ComputeMessageTypes.class,
(message, senderConsistentId, correlationId) -> {
+ assert correlationId != null;
+
+ if (!busyLock.enterBusy()) {
+ sendExecuteResponse(
+ null,
+ new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException()),
+ senderConsistentId,
+ correlationId
+ );
+ return;
+ }
+
+ try {
+ if (message instanceof ExecuteRequest) {
+ processExecuteRequest(starter, (ExecuteRequest) message,
senderConsistentId, correlationId);
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
+
+ /**
+ * Stop messaging service. After stop this service is not usable anymore.
+ */
+ public void stop() {
+ busyLock.block();
+ }
+
+ /**
+ * Submit Compute job to execution on remote node.
+ *
+ * @param options Job execution options.
+ * @param remoteNode The job will be executed on this node.
+ * @param units Deployment units. Can be empty.
+ * @param jobClassName Name of the job class to execute.
+ * @param args Arguments of the job.
+ * @param <R> Job result type
+ * @return Job result.
+ */
+ public <R> CompletableFuture<R> remoteExecuteRequest(
+ ExecutionOptions options,
+ ClusterNode remoteNode,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ Object[] args
+ ) {
+ List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
+ .map(ComputeUtils::toDeploymentUnitMsg)
+ .collect(Collectors.toList());
+
+ ExecuteRequest executeRequest = messagesFactory.executeRequest()
+ .executeOptions(options)
+ .deploymentUnits(deploymentUnitMsgs)
+ .jobClassName(jobClassName)
+ .args(args)
+ .build();
+
+ return messagingService.invoke(remoteNode, executeRequest,
NETWORK_TIMEOUT_MILLIS)
+ .thenCompose(message ->
resultFromExecuteResponse((ExecuteResponse) message));
+ }
+
+ private void processExecuteRequest(JobStarter starter, ExecuteRequest
executeRequest, String senderConsistentId, long correlationId) {
+ List<DeploymentUnit> units =
toDeploymentUnit(executeRequest.deploymentUnits());
+
+ starter.start(executeRequest.executeOptions(), units,
executeRequest.jobClassName(), executeRequest.args())
+ .whenComplete((result, err) ->
+ sendExecuteResponse(result, err, senderConsistentId,
correlationId));
+ }
+
+ private void sendExecuteResponse(@Nullable Object result, @Nullable
Throwable ex, String senderConsistentId, Long correlationId) {
+ ExecuteResponse executeResponse = messagesFactory.executeResponse()
+ .result(result)
+ .throwable(ex)
+ .build();
+
+ messagingService.respond(senderConsistentId, executeResponse,
correlationId);
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index db5dff110d..c8f8568c7e 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -21,12 +21,14 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
@@ -40,6 +42,8 @@ public class PriorityQueueExecutor {
private final ThreadPoolExecutor executor;
+ private final ComputeStateMachine stateMachine;
+
/**
* Constructor.
*
@@ -48,9 +52,11 @@ public class PriorityQueueExecutor {
*/
public PriorityQueueExecutor(
ComputeConfiguration configuration,
- ThreadFactory threadFactory
+ ThreadFactory threadFactory,
+ ComputeStateMachine stateMachine
) {
this.configuration = configuration;
+ this.stateMachine = stateMachine;
executor = new ThreadPoolExecutor(
configuration.threadPoolSize().value(),
configuration.threadPoolSize().value(),
@@ -72,13 +78,25 @@ public class PriorityQueueExecutor {
public <R> CompletableFuture<R> submit(Callable<R> job, int priority) {
Objects.requireNonNull(job);
- QueueEntry<R> queueEntry = new QueueEntry<>(job, priority);
+ UUID jobId = stateMachine.initJob();
+ QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
+ stateMachine.executeJob(jobId);
+ return job.call();
+ }, priority);
+
try {
executor.execute(queueEntry);
} catch (QueueOverflowException e) {
return failedFuture(new IgniteException(QUEUE_OVERFLOW_ERR, e));
}
- return queueEntry.toFuture();
+ return queueEntry.toFuture()
+ .whenComplete((r, throwable) -> {
+ if (throwable != null) {
+ stateMachine.failJob(jobId);
+ } else {
+ stateMachine.completeJob(jobId);
+ }
+ });
}
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index 7753fe42ca..dcf0764ce5 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -35,7 +35,7 @@ public class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
private final CompletableFuture<R> future = new CompletableFuture<>();
- private final Callable<R> job;
+ private final Callable<R> jobAction;
private final int priority;
@@ -44,11 +44,11 @@ public class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
/**
* Constructor.
*
- * @param job Compute job callable.
+ * @param jobAction Compute job callable.
* @param priority Job priority.
*/
- public QueueEntry(Callable<R> job, int priority) {
- this.job = job;
+ public QueueEntry(Callable<R> jobAction, int priority) {
+ this.jobAction = jobAction;
this.priority = priority;
seqNum = seq.getAndIncrement();
}
@@ -56,7 +56,7 @@ public class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
@Override
public void run() {
try {
- future.complete(job.call());
+ future.complete(jobAction.call());
} catch (Exception e) {
future.completeExceptionally(e);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
new file mode 100644
index 0000000000..f4b1666ddf
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+
+/**
+ * State machine of Compute Jobs.
+ */
+public interface ComputeStateMachine {
+ /**
+ * Start Compute jobs state machine. The instance can't be used before it
is started.
+ */
+ void start();
+
+ /**
+ * Stop Compute jobs state machine. The instance can't be used after it is
stopped.
+ */
+ void stop();
+
+ /**
+ * Initialize Compute job in state machine. This job should have status
{@link JobState#QUEUED}.
+ *
+ * @return Compute job identifier.
+ */
+ UUID initJob();
+
+ /**
+ * Tries to transfer Compute Job to complete state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
+ */
+ void completeJob(UUID jobId);
+
+ /**
+ * Tries to transfer Compute Job to execute state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to execute state.
+ */
+ void executeJob(UUID jobId);
+
+ /**
+ * Tries to transfer Compute Job to canceling state, it means that
execution may continue.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to canceling state.
+ */
+ void cancelingJob(UUID jobId);
+
+ /**
+ * Tries to transfer Compute Job to cancel state, it means that execution
canceled.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to canceled state.
+ */
+ void cancelJob(UUID jobId);
+
+ /**
+ * Tries to transfer Compute Job to fail state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to failed state.
+ */
+ void failJob(UUID jobId);
+
+ /**
+ * Returns current state of Compute Job.
+ *
+ * @param jobId Compute job identifier.
+ * @return Current state of Compute Job or {@code null} in case if job
with provided identifier doesn't exist.
+ */
+ JobState currentState(UUID jobId);
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
new file mode 100644
index 0000000000..002bcfcff0
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_STATE_TRANSITION_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Thrown from Compute Jobs state machine {@link ComputeStateMachine} when job
state transfer is illegal.
+ */
+public class IllegalJobStateTransition extends IgniteInternalException {
+ public IllegalJobStateTransition(UUID jobId) {
+ super(COMPUTE_JOB_STATE_TRANSITION_ERR, "Failed to transfer job state
for nonexistent job " + jobId + ".");
+ }
+
+ public IllegalJobStateTransition(UUID jobId, JobState prevState, JobState
newState) {
+ super(COMPUTE_JOB_STATE_TRANSITION_ERR, message(jobId, prevState,
newState));
+ }
+
+ private static String message(UUID jobId, JobState prevState, JobState
newState) {
+ return "Failed to transfer job " + jobId
+ + " from state " + prevState
+ + " to state " + newState;
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
new file mode 100644
index 0000000000..b16cdeea5e
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.compute.JobState.QUEUED;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * In memory implementation of {@link ComputeStateMachine}.
+ */
+public class InMemoryComputeStateMachine implements ComputeStateMachine {
+ private static final IgniteLogger LOG =
Loggers.forClass(InMemoryComputeStateMachine.class);
+
+ private final ComputeConfiguration configuration;
+
+ private ExecutorService cleaner;
+
+ private final Set<UUID> toRemove = new HashSet<>();
+
+ private final Set<UUID> waitToRemove = ConcurrentHashMap.newKeySet();
+
+ private final Map<UUID, JobState> states = new ConcurrentHashMap<>();
+
+ public InMemoryComputeStateMachine(ComputeConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void start() {
+ Long lifetime = configuration.statesLifetimeMillis().value();
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("InMemoryComputeStateMachine-pool", LOG)
+ );
+ executor.scheduleAtFixedRate(() -> {
+ Set<UUID> nextToRemove = Set.of(waitToRemove.toArray(UUID[]::new));
+ this.waitToRemove.removeAll(nextToRemove);
+
+ for (UUID jobId : toRemove) {
+ states.remove(jobId);
+ }
+ toRemove.clear();
+ toRemove.addAll(nextToRemove);
+ }, lifetime, lifetime, TimeUnit.MILLISECONDS);
+ cleaner = executor;
+ }
+
+ @Override
+ public void stop() {
+ shutdownAndAwaitTermination(cleaner, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public JobState currentState(UUID jobId) {
+ return states.get(jobId);
+ }
+
+ @Override
+ public UUID initJob() {
+ UUID uuid = UUID.randomUUID();
+ JobState prevValue = states.putIfAbsent(uuid, QUEUED);
+ if (prevValue != null) {
+ LOG.info("UUID collision detected! UUID: {}", uuid);
+ return initJob();
+ }
+
+ return uuid;
+ }
+
+ @Override
+ public void executeJob(UUID jobId) {
+ changeState(jobId, EXECUTING, QUEUED);
+ }
+
+ @Override
+ public void failJob(UUID jobId) {
+ changeState(jobId, FAILED, EXECUTING, CANCELING);
+ waitToRemove.add(jobId);
+ }
+
+ @Override
+ public void completeJob(UUID jobId) {
+ changeState(jobId, COMPLETED, EXECUTING, CANCELING);
+ waitToRemove.add(jobId);
+ }
+
+ @Override
+ public void cancelingJob(UUID jobId) {
+ changeState(jobId, currentState -> {
+ if (currentState == QUEUED) {
+ waitToRemove.add(jobId);
+ return CANCELED;
+ } else if (currentState == EXECUTING) {
+ return CANCELING;
+ }
+
+ throw new IllegalJobStateTransition(jobId, currentState,
CANCELING);
+ });
+ }
+
+ @Override
+ public void cancelJob(UUID jobId) {
+ changeState(jobId, CANCELED, QUEUED, CANCELING);
+ waitToRemove.add(jobId);
+ }
+
+ private void changeState(UUID jobId, JobState newState, JobState...
requiredStates) {
+ changeState(jobId, currentState -> {
+ for (JobState requiredState : requiredStates) {
+ if (currentState == requiredState) {
+ return newState;
+ }
+ }
+
+ throw new IllegalJobStateTransition(jobId, currentState, newState);
+ });
+ }
+
+ private void changeState(
+ UUID jobId,
+ Function<JobState, JobState> newStateFunction
+ ) {
+ JobState prevValue = states.computeIfPresent(jobId,
+ (uuid, currentState) -> newStateFunction.apply(currentState)
+ );
+
+ if (prevValue == null) {
+ throw new IllegalJobStateTransition(jobId);
+ }
+ }
+
+}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 5ad714c539..120c7c7a06 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.compute;
import static org.apache.ignite.internal.compute.ExecutionOptions.DEFAULT;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -57,22 +60,23 @@ import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.executor.ComputeExecutor;
+import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl;
import org.apache.ignite.internal.compute.loader.JobClassLoader;
import org.apache.ignite.internal.compute.loader.JobContext;
import org.apache.ignite.internal.compute.loader.JobContextManager;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.internal.compute.queue.ComputeExecutor;
-import org.apache.ignite.internal.compute.queue.ComputeExecutorImpl;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitUnavailableException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
@@ -143,8 +147,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
willCompleteSuccessfully()
);
+ computeExecutor = new ComputeExecutorImpl(ignite, new
InMemoryComputeStateMachine(computeConfiguration), computeConfiguration);
- computeExecutor = new ComputeExecutorImpl(ignite,
computeConfiguration);
computeComponent = new ComputeComponentImpl(messagingService,
jobContextManager, computeExecutor);
computeComponent.start();
@@ -156,10 +160,10 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
}
@Test
- void executesLocally() throws Exception {
- String result = computeComponent.<String>executeLocally(List.of(),
SimpleJob.class.getName(), "a", 42).get();
+ void executesLocally() {
+ CompletableFuture<String> result =
computeComponent.executeLocally(List.of(), SimpleJob.class.getName(), "a", 42);
- assertThat(result, is("jobResponse"));
+ assertThat(result, willBe("jobResponse"));
assertThatExecuteRequestWasNotSent();
}
@@ -266,11 +270,9 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnLocalExecutionAttempt() throws
Exception {
computeComponent.stop();
- Object result = computeComponent.executeLocally(List.of(),
SimpleJob.class.getName())
- .handle((s, ex) -> ex != null ? ex : s)
- .get();
+ CompletableFuture<Object> result =
computeComponent.executeLocally(List.of(), SimpleJob.class.getName());
- assertThat(result, is(instanceOf(NodeStoppingException.class)));
+ assertThat(result,
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
}
@Test
@@ -284,11 +286,9 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
void stoppedComponentReturnsExceptionOnRemoteExecutionAttempt() throws
Exception {
computeComponent.stop();
- Object result = computeComponent.executeRemotely(remoteNode,
List.of(), SimpleJob.class.getName())
- .handle((s, ex) -> ex != null ? ex : s)
- .get();
+ CompletableFuture<Object> result =
computeComponent.executeRemotely(remoteNode, List.of(),
SimpleJob.class.getName());
- assertThat(result, is(instanceOf(NodeStoppingException.class)));
+ assertThat(result,
willThrowWithCauseOrSuppressed(NodeStoppingException.class));
}
@Test
@@ -327,7 +327,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
ExecuteResponse response = executeResponseCaptor.getValue();
assertThat(response.result(), is(nullValue()));
- assertThat(response.throwable(),
is(instanceOf(NodeStoppingException.class)));
+ assertThat(response.throwable(),
is(instanceOf(IgniteInternalException.class)));
+ assertThat(response.throwable().getCause(),
is(instanceOf(NodeStoppingException.class)));
}
@Test
@@ -348,23 +349,24 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(computeConfiguration.change(computeChange ->
computeChange.changeThreadPoolStopTimeoutMillis(100)),
willCompleteSuccessfully());
- computeComponent = new ComputeComponentImpl(messagingService,
jobContextManager, computeExecutor);
+ computeComponent = new ComputeComponentImpl(
+ messagingService,
+ jobContextManager,
+ computeExecutor
+ );
computeComponent.start();
// take the only executor thread
computeComponent.executeLocally(List.of(), LongJob.class.getName());
// the corresponding task goes to work queue
- CompletableFuture<Object> resultFuture =
computeComponent.executeLocally(List.of(), SimpleJob.class.getName())
- .handle((res, ex) -> ex != null ? ex : res);
+ CompletableFuture<Object> resultFuture =
computeComponent.executeLocally(List.of(), SimpleJob.class.getName());
computeComponent.stop();
// now work queue is dropped to the floor, so the future should be
resolved with a cancellation
- Exception result = (Exception) resultFuture.get(3, TimeUnit.SECONDS);
-
- assertThat(result.getCause(),
is(instanceOf(CancellationException.class)));
+ assertThat(resultFuture, willThrow(CancellationException.class));
}
@Test
@@ -414,7 +416,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(
computeComponent.executeLocally(units, "com.example.Maim"),
-
CompletableFutureExceptionMatcher.willThrow(ClassNotFoundException.class)
+ willThrow(ClassNotFoundException.class)
);
}
@@ -432,7 +434,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
assertThat(
computeComponent.executeLocally(units, "com.example.Maim"),
-
CompletableFutureExceptionMatcher.willThrow(ClassNotFoundException.class)
+ willThrow(ClassNotFoundException.class)
);
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 10c1b9c6e1..3dd1d6310c 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -65,7 +66,8 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
priorityQueueExecutor = new PriorityQueueExecutor(
configuration,
- new
NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG)
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix("testNode", "compute"), LOG),
+ new InMemoryComputeStateMachine(configuration)
);
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
new file mode 100644
index 0000000000..820685b1a5
--- /dev/null
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.UUID;
+import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for {@link InMemoryComputeStateMachine}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest {
+ private ComputeStateMachine stateMachine;
+
+ @InjectConfiguration
+ private ComputeConfiguration configuration;
+
+ private UUID jobId;
+
+ @BeforeEach
+ public void setup() {
+ stateMachine = new InMemoryComputeStateMachine(configuration);
+ stateMachine.start();
+ jobId = stateMachine.initJob();
+ }
+
+ @AfterEach
+ public void clean() {
+ stateMachine.stop();
+ }
+
+ @Test
+ public void testSubmit() {
+ assertThat(jobId, is(notNullValue()));
+ }
+
+ @Test
+ public void testCompleteWay() {
+ executeJob(false);
+ completeJob(false);
+ }
+
+ @Test
+ public void testCancel() {
+ stateMachine.cancelJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(CANCELED));
+ }
+
+ @Test
+ public void testCancelFromExecuting() {
+ executeJob(false);
+ cancelingJob(false);
+ cancelJob(false);
+ }
+
+ @Test
+ public void testCompleteCanceling() {
+ executeJob(false);
+ cancelingJob(false);
+ completeJob(false);
+ }
+
+ @Test
+ public void testFailCanceling() {
+ executeJob(false);
+ cancelingJob(false);
+ failJob(false);
+ }
+
+ @Test
+ public void testFailExecuting() {
+ executeJob(false);
+ failJob(false);
+ }
+
+ @Test
+ public void testCompleteExecution() {
+ executeJob(false);
+ completeJob(false);
+ }
+
+ @Test
+ public void testDoubleExecution() {
+ executeJob(false);
+ executeJob(true);
+ }
+
+ @Test
+ public void testDoubleComplete() {
+ executeJob(false);
+
+ completeJob(false);
+ completeJob(true);
+ }
+
+ @Test
+ public void testDoubleFail() {
+ executeJob(false);
+
+ failJob(false);
+ failJob(true);
+ }
+
+ @Test
+ public void testCleanStates() throws InterruptedException {
+ assertThat(configuration.change(change ->
change.changeStatesLifetimeMillis(100)), willCompleteSuccessfully());
+
+ stateMachine = new InMemoryComputeStateMachine(configuration);
+ stateMachine.start();
+
+ jobId = stateMachine.initJob();
+ executeJob(false);
+ completeJob(false);
+ IgniteTestUtils.waitForCondition(() ->
stateMachine.currentState(jobId) == null, 100);
+
+ jobId = stateMachine.initJob();
+ executeJob(false);
+ failJob(false);
+ IgniteTestUtils.waitForCondition(() ->
stateMachine.currentState(jobId) == null, 100);
+
+ jobId = stateMachine.initJob();
+ cancelJob(false);
+ IgniteTestUtils.waitForCondition(() ->
stateMachine.currentState(jobId) == null, 100);
+
+ jobId = stateMachine.initJob();
+ executeJob(false);
+ cancelingJob(false);
+ cancelJob(false);
+ IgniteTestUtils.waitForCondition(() ->
stateMachine.currentState(jobId) == null, 100);
+
+ stateMachine.stop();
+ }
+
+ private void cancelJob(boolean shouldFail) {
+ if (!shouldFail) {
+ stateMachine.cancelJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(CANCELED));
+ } else {
+ assertThrows(IllegalJobStateTransition.class, () ->
stateMachine.cancelJob(jobId));
+ }
+ }
+
+ private void cancelingJob(boolean shouldFail) {
+ if (!shouldFail) {
+ stateMachine.cancelingJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(CANCELING));
+ } else {
+ assertThrows(IllegalJobStateTransition.class, () ->
stateMachine.cancelJob(jobId));
+ }
+ }
+
+ private void executeJob(boolean shouldFail) {
+ if (!shouldFail) {
+ stateMachine.executeJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(EXECUTING));
+ } else {
+ assertThrows(IllegalJobStateTransition.class, () ->
stateMachine.executeJob(jobId));
+ }
+ }
+
+ private void completeJob(boolean shouldFail) {
+ if (!shouldFail) {
+ stateMachine.completeJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(COMPLETED));
+ } else {
+ assertThrows(IllegalJobStateTransition.class, () ->
stateMachine.completeJob(jobId));
+ }
+ }
+
+ private void failJob(boolean shouldFail) {
+ if (!shouldFail) {
+ stateMachine.failJob(jobId);
+ assertThat(stateMachine.currentState(jobId), is(FAILED));
+ } else {
+ assertThrows(IllegalJobStateTransition.class, () ->
stateMachine.failJob(jobId));
+ }
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9bd157e530..f92cf11b4a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -68,9 +68,10 @@ import org.apache.ignite.internal.compute.ComputeComponent;
import org.apache.ignite.internal.compute.ComputeComponentImpl;
import org.apache.ignite.internal.compute.IgniteComputeImpl;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl;
import org.apache.ignite.internal.compute.loader.JobClassLoaderFactory;
import org.apache.ignite.internal.compute.loader.JobContextManager;
-import org.apache.ignite.internal.compute.queue.ComputeExecutorImpl;
+import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
import
org.apache.ignite.internal.configuration.ConfigurationDynamicDefaultsPatcherImpl;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -643,10 +644,12 @@ public class IgniteImpl implements Ignite {
);
deploymentManager = deploymentManagerImpl;
+ ComputeConfiguration computeCfg =
nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY);
+ InMemoryComputeStateMachine stateMachine = new
InMemoryComputeStateMachine(computeCfg);
computeComponent = new ComputeComponentImpl(
clusterSvc.messagingService(),
new JobContextManager(deploymentManagerImpl,
deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()),
- new ComputeExecutorImpl(this,
nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY))
+ new ComputeExecutorImpl(this, stateMachine, computeCfg)
);
compute = new IgniteComputeImpl(clusterSvc.topologyService(),
distributedTblMgr, computeComponent);