This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e02967e095 IGNITE-20848 Introduce management REST API for Compute
(#3062)
e02967e095 is described below
commit e02967e095d2468a54903674ff1484e573941113
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Wed Jan 31 12:55:58 2024 +0100
IGNITE-20848 Introduce management REST API for Compute (#3062)
---
.../ignite/internal/compute/ComputeComponent.java | 8 +
.../internal/compute/ComputeComponentImpl.java | 6 +
.../internal/compute/ComputeMessageTypes.java | 10 +
.../ignite/internal/compute/ComputeUtils.java | 17 +
.../ignite/internal/compute/ExecutionManager.java | 22 ++
.../compute/message/JobStatusesRequest.java | 29 ++
.../compute/message/JobStatusesResponse.java | 40 ++
.../compute/messaging/ComputeMessaging.java | 84 ++++-
modules/rest-api/openapi/openapi.yaml | 169 +++++++++
.../internal/rest/api/compute/ComputeApi.java | 136 +++++++
.../ignite/internal/rest/api/compute/JobState.java | 53 +++
.../internal/rest/api/compute/JobStatus.java | 115 ++++++
.../rest/api/compute/UpdateJobPriorityBody.java | 46 +++
modules/rest/build.gradle | 7 +
.../rest/compute/ItComputeControllerTest.java | 410 +++++++++++++++++++++
.../internal/rest/compute/ComputeController.java | 101 +++++
.../internal/rest/compute/ComputeRestFactory.java | 47 +++
.../exception/ComputeJobNotFoundException.java | 36 ++
.../exception/ComputeJobStateException.java | 37 ++
.../ComputeJobNotFoundExceptionHandler.java | 44 +++
.../handler/ComputeJobStateExceptionHandler.java | 43 +++
.../rest/matcher/MicronautHttpResponseMatcher.java | 119 ++++++
.../internal/rest/matcher/ProblemMatcher.java | 169 +++++++++
.../rest/matcher/RestJobStatusMatcher.java | 220 +++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
25 files changed, 1972 insertions(+), 3 deletions(-)
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 22a0737a6a..679000389e 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -103,6 +104,13 @@ public interface ComputeComponent extends IgniteComponent {
return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units,
jobClassName, args);
}
+ /**
+ * Retrieves the current status of all jobs on all nodes in the cluster.
+ *
+ * @return The collection of job statuses.
+ */
+ CompletableFuture<Collection<JobStatus>> statusesAsync();
+
/**
* Retrieves the current status of the job on any node in the cluster. The
job status may be deleted and thus return {@code null} if the
* time for retaining job status has been exceeded.
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 1dc4598500..19762aa0fc 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.map
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -145,6 +146,11 @@ public class ComputeComponentImpl implements
ComputeComponent {
}
}
+ @Override
+ public CompletableFuture<Collection<JobStatus>> statusesAsync() {
+ return messaging.broadcastStatusesAsync();
+ }
+
@Override
public CompletableFuture<@Nullable JobStatus> statusAsync(UUID jobId) {
return messaging.broadcastStatusAsync(jobId);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 8a38cb32e4..d3da0d352f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -20,12 +20,16 @@ package org.apache.ignite.internal.compute;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.compute.message.JobCancelRequest;
+import org.apache.ignite.internal.compute.message.JobCancelResponse;
import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
import org.apache.ignite.internal.compute.message.JobStatusResponse;
+import org.apache.ignite.internal.compute.message.JobStatusesRequest;
+import org.apache.ignite.internal.compute.message.JobStatusesResponse;
import org.apache.ignite.network.annotations.MessageGroup;
/**
@@ -71,4 +75,10 @@ public class ComputeMessageTypes {
/** Type for {@link JobChangePriorityResponse}. */
public static final short JOB_CHANGE_PRIORITY_RESPONSE = 10;
+
+ /** Type for {@link JobStatusesRequest}. */
+ public static final short JOB_STATUSES_REQUEST = 11;
+
+ /** Type for {@link JobStatusesResponse}. */
+ public static final short JOB_STATUSES_RESPONSE = 12;
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 5d1f62d420..cf63bab9cc 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -22,6 +22,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import java.lang.reflect.Constructor;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -37,6 +38,7 @@ import
org.apache.ignite.internal.compute.message.JobCancelResponse;
import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusResponse;
+import org.apache.ignite.internal.compute.message.JobStatusesResponse;
import org.jetbrains.annotations.Nullable;
/**
@@ -141,6 +143,21 @@ public class ComputeUtils {
return completedFuture((R) jobResultResponse.result());
}
+ /**
+ * Extract compute job statuses from statuses response.
+ *
+ * @param jobStatusesResponse Job statuses result message response.
+ * @return Completable future with result.
+ */
+ public static CompletableFuture<Collection<JobStatus>>
statusesFromJobStatusesResponse(JobStatusesResponse jobStatusesResponse) {
+ Throwable throwable = jobStatusesResponse.throwable();
+ if (throwable != null) {
+ return failedFuture(throwable);
+ }
+
+ return completedFuture(jobStatusesResponse.statuses());
+ }
+
/**
* Extract compute job status from status response.
*
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index 4a7d20423f..064f9fa3ac 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -18,10 +18,14 @@
package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Compute.RESULT_NOT_FOUND_ERR;
+import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +33,7 @@ import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
+import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
@@ -90,6 +95,23 @@ public class ExecutionManager {
return failedFuture(new ComputeException(RESULT_NOT_FOUND_ERR, "Job
result not found for the job with ID: " + jobId));
}
+ /**
+ * Retrieves the current status of all jobs in the local node.
+ *
+ * @return The set of all job statuses.
+ */
+ public CompletableFuture<Set<JobStatus>> localStatusesAsync() {
+ CompletableFuture<JobStatus>[] statuses = executions.values().stream()
+ .filter(it -> !(it instanceof RemoteJobExecution))
+ .map(JobExecution::statusAsync)
+ .toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(statuses)
+ .thenApply(ignored ->
Arrays.stream(statuses).map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .collect(toSet()));
+ }
+
/**
* Retrieves the current status of the job.
*
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java
new file mode 100644
index 0000000000..94dab7502d
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Remote job statuses request.
+ */
+@Transferable(ComputeMessageTypes.JOB_STATUSES_REQUEST)
+public interface JobStatusesRequest extends NetworkMessage {
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java
new file mode 100644
index 0000000000..cd8c98ac31
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/message/JobStatusesResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.message;
+
+import java.util.Collection;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.compute.ComputeMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Remote job statuses response.
+ */
+@Transferable(ComputeMessageTypes.JOB_STATUSES_RESPONSE)
+public interface JobStatusesResponse extends NetworkMessage {
+ @Nullable
+ @Marshallable
+ Collection<JobStatus> statuses();
+
+ @Nullable
+ @Marshallable
+ Throwable throwable();
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index cbe1f2285c..03d821746b 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -18,22 +18,25 @@
package org.apache.ignite.internal.compute.messaging;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.compute.ComputeUtils.cancelFromJobCancelResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.changePriorityFromJobChangePriorityResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.jobIdFromExecuteResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResultResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse;
+import static
org.apache.ignite.internal.compute.ComputeUtils.statusesFromJobStatusesResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.CANCELLING_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.FAIL_TO_GET_JOB_STATUS_ERR;
+import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
@@ -55,8 +58,11 @@ import
org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStatusRequest;
import org.apache.ignite.internal.compute.message.JobStatusResponse;
+import org.apache.ignite.internal.compute.message.JobStatusesRequest;
+import org.apache.ignite.internal.compute.message.JobStatusesResponse;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
@@ -128,6 +134,8 @@ public class ComputeMessaging {
sendExecuteResponse(null, ex, senderConsistentId, correlationId);
} else if (message instanceof JobResultRequest) {
sendJobResultResponse(null, ex, senderConsistentId, correlationId);
+ } else if (message instanceof JobStatusesRequest) {
+ sendJobStatusesResponse(null, ex, senderConsistentId,
correlationId);
} else if (message instanceof JobStatusRequest) {
sendJobStatusResponse(null, ex, senderConsistentId, correlationId);
} else if (message instanceof JobCancelRequest) {
@@ -147,6 +155,8 @@ public class ComputeMessaging {
processExecuteRequest(starter, (ExecuteRequest) message,
senderConsistentId, correlationId);
} else if (message instanceof JobResultRequest) {
processJobResultRequest((JobResultRequest) message,
senderConsistentId, correlationId);
+ } else if (message instanceof JobStatusesRequest) {
+ processJobStatusesRequest((JobStatusesRequest) message,
senderConsistentId, correlationId);
} else if (message instanceof JobStatusRequest) {
processJobStatusRequest((JobStatusRequest) message,
senderConsistentId, correlationId);
} else if (message instanceof JobCancelRequest) {
@@ -182,7 +192,7 @@ public class ComputeMessaging {
) {
List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
.map(ComputeUtils::toDeploymentUnitMsg)
- .collect(Collectors.toList());
+ .collect(toList());
ExecuteRequest executeRequest = messagesFactory.executeRequest()
.executeOptions(options)
@@ -247,6 +257,33 @@ public class ComputeMessaging {
messagingService.respond(senderConsistentId, jobResultResponse,
correlationId);
}
+ CompletableFuture<Collection<JobStatus>> remoteStatusesAsync(ClusterNode
remoteNode) {
+ JobStatusesRequest jobStatusRequest =
messagesFactory.jobStatusesRequest()
+ .build();
+
+ return messagingService.invoke(remoteNode, jobStatusRequest,
NETWORK_TIMEOUT_MILLIS)
+ .thenCompose(networkMessage ->
statusesFromJobStatusesResponse((JobStatusesResponse) networkMessage));
+ }
+
+ private void processJobStatusesRequest(JobStatusesRequest message, String
senderConsistentId, long correlationId) {
+ executionManager.localStatusesAsync()
+ .whenComplete((statuses, throwable) ->
sendJobStatusesResponse(statuses, throwable, senderConsistentId,
correlationId));
+ }
+
+ private void sendJobStatusesResponse(
+ @Nullable Collection<JobStatus> statuses,
+ @Nullable Throwable throwable,
+ String senderConsistentId,
+ Long correlationId
+ ) {
+ JobStatusesResponse jobStatusResponse =
messagesFactory.jobStatusesResponse()
+ .statuses(statuses)
+ .throwable(throwable)
+ .build();
+
+ messagingService.respond(senderConsistentId, jobStatusResponse,
correlationId);
+ }
+
/**
* Gets compute job status from the remote node.
*
@@ -360,6 +397,26 @@ public class ComputeMessaging {
messagingService.respond(senderConsistentId,
jobChangePriorityResponse, correlationId);
}
+ /**
+ * Broadcasts job statuses request to all nodes in the cluster.
+ *
+ * @return The future which will be completed with the collection of
statuses from all nodes.
+ */
+ public CompletableFuture<Collection<JobStatus>> broadcastStatusesAsync() {
+ return broadcastAsyncAndCollect(
+ node -> remoteStatusesAsync(node),
+ throwable -> new ComputeException(
+ FAIL_TO_GET_JOB_STATUS_ERR,
+ "Failed to retrieve statuses",
+ throwable
+ )).thenApply(statuses -> {
+ return statuses.stream()
+ .flatMap(Collection::stream)
+ .filter(Objects::nonNull)
+ .collect(toList());
+ });
+ }
+
/**
* Broadcasts job status request to all nodes in the cluster.
*
@@ -448,6 +505,29 @@ public class ComputeMessaging {
result.completeExceptionally(error.apply(throwable));
}
});
+
+ return result;
+ }
+
+ private <R> CompletableFuture<Collection<R>> broadcastAsyncAndCollect(
+ Function<ClusterNode, CompletableFuture<@Nullable R>> request,
+ Function<Throwable, Throwable> error
+ ) {
+ CompletableFuture<Collection<R>> result = new CompletableFuture<>();
+
+ CompletableFuture<R>[] futures = topologyService.allMembers()
+ .stream()
+ .map(request::apply)
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFutures.allOf(futures).whenComplete((collection, throwable)
-> {
+ if (throwable == null) {
+ result.complete(collection);
+ } else {
+ result.completeExceptionally(error.apply(throwable));
+ }
+ });
+
return result;
}
}
diff --git a/modules/rest-api/openapi/openapi.yaml
b/modules/rest-api/openapi/openapi.yaml
index 0e691aacf7..2cd3bc2e4d 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -118,6 +118,124 @@ paths:
application/problem+json:
schema:
$ref: '#/components/schemas/Problem'
+ /management/v1/compute/jobs:
+ get:
+ tags:
+ - compute
+ summary: Retrieve all job statuses
+ description: Fetches the current statuses of all compute jobs.
+ operationId: jobStatuses
+ responses:
+ "200":
+ description: Successful retrieval of job statuses.
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/JobStatus'
+ /management/v1/compute/jobs/{jobId}:
+ get:
+ tags:
+ - compute
+ summary: Retrieve a job status
+ description: Fetches the current status of a specific compute job
identified
+ by jobId.
+ operationId: jobStatus
+ parameters:
+ - name: jobId
+ in: path
+ description: The unique identifier of the compute job.
+ required: true
+ schema:
+ type: string
+ description: The unique identifier of the compute job.
+ format: uuid
+ responses:
+ "200":
+ description: Successful retrieval of the job status.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/JobStatus'
+ "404":
+ description: Compute job not found.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Problem'
+ delete:
+ tags:
+ - compute
+ summary: Cancel a job
+ description: Cancels a specific compute job identified by jobId.
+ operationId: cancelJob
+ parameters:
+ - name: jobId
+ in: path
+ description: The unique identifier of the compute job.
+ required: true
+ schema:
+ type: string
+ description: The unique identifier of the compute job.
+ format: uuid
+ responses:
+ "200":
+ description: Successful cancellation of the job.
+ content:
+ application/json: {}
+ "404":
+ description: Compute job not found.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Problem'
+ "409":
+ description: Compute job is in an illegal state.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Problem'
+ /management/v1/compute/jobs/{jobId}/priority:
+ put:
+ tags:
+ - compute
+ summary: Update a job's priority
+ description: Updates the priority of a specific compute job identified
by jobId.
+ operationId: updatePriority
+ parameters:
+ - name: jobId
+ in: path
+ description: The unique identifier of the compute job.
+ required: true
+ schema:
+ type: string
+ description: The unique identifier of the compute job.
+ format: uuid
+ requestBody:
+ description: The new priority data for the job.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/UpdateJobPriorityBody'
+ required: true
+ responses:
+ "200":
+ description: Successful update of the job priority.
+ content:
+ application/json: {}
+ "404":
+ description: Compute job not found.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Problem'
+ "409":
+ description: Compute job is in an illegal state.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Problem'
/management/v1/configuration/cluster:
get:
tags:
@@ -818,6 +936,47 @@ components:
type: string
description: The issue with the parameter.
description: Information about invalid request parameter.
+ JobState:
+ type: string
+ description: Job state.
+ enum:
+ - QUEUED
+ - EXECUTING
+ - FAILED
+ - COMPLETED
+ - CANCELING
+ - CANCELED
+ JobStatus:
+ required:
+ - createTime
+ - id
+ - state
+ type: object
+ properties:
+ id:
+ type: string
+ description: Job ID.
+ format: uuid
+ state:
+ description: Job state.
+ allOf:
+ - $ref: '#/components/schemas/JobState'
+ - {}
+ createTime:
+ type: string
+ description: Job create time.
+ format: date-time
+ startTime:
+ type: string
+ description: Job start time.
+ format: date-time
+ nullable: true
+ finishTime:
+ type: string
+ description: Job finish time.
+ format: date-time
+ nullable: true
+ description: Rest representation of org.apache.ignite.compute.JobStatus.
Metric:
type: object
properties:
@@ -982,6 +1141,16 @@ components:
- $ref: '#/components/schemas/DeploymentStatus'
- description: Unit status.
description: Unit version and status.
+ UpdateJobPriorityBody:
+ required:
+ - priority
+ type: object
+ properties:
+ priority:
+ type: integer
+ description: Priority.
+ format: int32
+ description: DTO of update job priority request body.
deployMode:
type: string
description: Initial set of nodes to deploy.
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java
new file mode 100644
index 0000000000..41f9b08fac
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/ComputeApi.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.compute;
+
+import static io.swagger.v3.oas.annotations.media.Schema.RequiredMode.REQUIRED;
+import static
org.apache.ignite.internal.rest.constants.MediaType.APPLICATION_JSON;
+
+import io.micronaut.http.annotation.Body;
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Delete;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.Put;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.ArraySchema;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+
+/**
+ * API for managing compute tasks.
+ */
+@Controller("/management/v1/compute/")
+@Tag(name = "compute")
+public interface ComputeApi {
+ /**
+ * Retrieves the statuses of all compute jobs.
+ *
+ * @return A collection of compute job statuses.
+ */
+ @Operation(summary = "Retrieve all job statuses", description = "Fetches
the current statuses of all compute jobs.")
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successful retrieval of job statuses.",
+ content = @Content(mediaType = APPLICATION_JSON, array =
@ArraySchema(schema = @Schema(implementation = JobStatus.class)))
+ )
+ @Get("jobs")
+ CompletableFuture<Collection<JobStatus>> jobStatuses();
+
+ /**
+ * Retrieves the status of a specific compute job.
+ *
+ * @param jobId The unique identifier of the compute job.
+ * @return The status of the specified compute job.
+ */
+ @Operation(summary = "Retrieve a job status", description = "Fetches the
current status of a specific compute job identified by jobId.")
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successful retrieval of the job status.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = JobStatus.class))
+ )
+ @ApiResponse(
+ responseCode = "404",
+ description = "Compute job not found.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @Get("jobs/{jobId}")
+ CompletableFuture<JobStatus> jobStatus(
+ @Schema(name = "jobId", description = "The unique identifier of
the compute job.", requiredMode = REQUIRED) UUID jobId
+ );
+
+ /**
+ * Updates the priority of a compute job.
+ *
+ * @param jobId The unique identifier of the compute job.
+ * @param updateJobPriorityBody The new priority data for the job.
+ * @return The result of the operation.
+ */
+ @Operation(summary = "Update a job's priority", description = "Updates the
priority of a specific compute job identified by jobId.")
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successful update of the job priority.",
+ content = @Content(mediaType = APPLICATION_JSON)
+ )
+ @ApiResponse(
+ responseCode = "404",
+ description = "Compute job not found.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @ApiResponse(
+ responseCode = "409",
+ description = "Compute job is in an illegal state.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @Put("jobs/{jobId}/priority")
+ CompletableFuture<Void> updatePriority(
+ @Schema(name = "jobId", description = "The unique identifier of
the compute job.", requiredMode = REQUIRED) UUID jobId,
+ @Body UpdateJobPriorityBody updateJobPriorityBody
+ );
+
+ /**
+ * Cancels a specific compute job.
+ *
+ * @param jobId The unique identifier of the compute job.
+ * @return The result of the cancellation operation.
+ */
+ @Operation(summary = "Cancel a job", description = "Cancels a specific
compute job identified by jobId.")
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successful cancellation of the job.",
+ content = @Content(mediaType = APPLICATION_JSON)
+ )
+ @ApiResponse(
+ responseCode = "404",
+ description = "Compute job not found.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @ApiResponse(
+ responseCode = "409",
+ description = "Compute job is in an illegal state.",
+ content = @Content(mediaType = APPLICATION_JSON, schema =
@Schema(implementation = Problem.class))
+ )
+ @Delete("jobs/{jobId}")
+ CompletableFuture<Void> cancelJob(
+ @Schema(name = "jobId", description = "The unique identifier of
the compute job.", requiredMode = REQUIRED) UUID jobId
+ );
+}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java
new file mode 100644
index 0000000000..531e14bf5b
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.compute;
+
+/**
+ * Rest representation of {@link org.apache.ignite.compute.JobState}.
+ */
+public enum JobState {
+ /**
+ * The job is submitted and waiting for an execution start.
+ */
+ QUEUED,
+
+ /**
+ * The job is being executed.
+ */
+ EXECUTING,
+
+ /**
+ * The job was unexpectedly terminated during execution.
+ */
+ FAILED,
+
+ /**
+ * The job was executed successfully and the execution result was returned.
+ */
+ COMPLETED,
+
+ /**
+ * The job has received the cancel command, but it is still running.
+ */
+ CANCELING,
+
+ /**
+ * The job was successfully cancelled.
+ */
+ CANCELED;
+}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java
new file mode 100644
index 0000000000..f7d17008dd
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/JobStatus.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.compute;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.media.Schema.RequiredMode;
+import java.time.Instant;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Rest representation of {@link org.apache.ignite.compute.JobStatus}.
+ */
+@Schema(name = "JobStatus")
+public class JobStatus {
+ /**
+ * Job ID.
+ */
+ @Schema(description = "Job ID.", requiredMode = RequiredMode.REQUIRED)
+ private final UUID id;
+
+ /**
+ * Job state.
+ */
+ @Schema(description = "Job state.", requiredMode = RequiredMode.REQUIRED)
+ private final JobState state;
+
+ /**
+ * Job create time.
+ */
+ @Schema(description = "Job create time.", requiredMode =
RequiredMode.REQUIRED)
+ private final Instant createTime;
+
+ /**
+ * Job start time.
+ */
+ @Schema(description = "Job start time.", requiredMode =
RequiredMode.NOT_REQUIRED)
+ @Nullable
+ private final Instant startTime;
+
+ /**
+ * Job finish time.
+ */
+ @Schema(description = "Job finish time.", requiredMode =
RequiredMode.NOT_REQUIRED)
+ @Nullable
+ private final Instant finishTime;
+
+ /**
+ * Constructor.
+ *
+ * @param id Job ID.
+ * @param state Job state.
+ * @param createTime Job create time.
+ * @param startTime Job start time.
+ * @param finishTime Job finish time.
+ */
+ @JsonCreator
+ public JobStatus(
+ @JsonProperty("id") UUID id,
+ @JsonProperty("state") JobState state,
+ @JsonProperty("createTime") Instant createTime,
+ @JsonProperty("startTime") @Nullable Instant startTime,
+ @JsonProperty("finishTime") @Nullable Instant finishTime
+ ) {
+ this.id = id;
+ this.state = state;
+ this.createTime = createTime;
+ this.startTime = startTime;
+ this.finishTime = finishTime;
+ }
+
+ @JsonProperty("id")
+ public UUID id() {
+ return id;
+ }
+
+ @JsonProperty("state")
+ public JobState state() {
+ return state;
+ }
+
+ @JsonProperty("createTime")
+ public Instant createTime() {
+ return createTime;
+ }
+
+ @Nullable
+ @JsonProperty("startTime")
+ public Instant startTime() {
+ return startTime;
+ }
+
+ @Nullable
+ @JsonProperty("finishTime")
+ public Instant finishTime() {
+ return finishTime;
+ }
+}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java
new file mode 100644
index 0000000000..fc05fe86cb
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/compute/UpdateJobPriorityBody.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.compute;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.media.Schema.RequiredMode;
+
+/**
+ * DTO of update job priority request body.
+ */
+@Schema(name = "UpdateJobPriorityBody")
+public class UpdateJobPriorityBody {
+ /**
+ * Priority.
+ */
+ @Schema(description = "Priority.", requiredMode = RequiredMode.REQUIRED)
+ private final int priority;
+
+ @JsonCreator
+ public UpdateJobPriorityBody(@JsonProperty("priority") int priority) {
+ this.priority = priority;
+ }
+
+ @JsonGetter("priority")
+ public int priority() {
+ return priority;
+ }
+}
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index 4810db7969..f156360b91 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -38,6 +38,7 @@ dependencies {
implementation project(':ignite-metrics')
implementation project(':ignite-code-deployment')
implementation project(':ignite-security-api')
+ implementation project(':ignite-compute')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.http.server.netty
@@ -78,6 +79,7 @@ dependencies {
integrationTestImplementation
testFixtures(project(':ignite-cluster-management'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(":ignite-api"))
+ integrationTestImplementation testFixtures(project(":ignite-rest"))
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.micronaut.junit5
integrationTestImplementation libs.micronaut.test
@@ -90,4 +92,9 @@ dependencies {
//So, exclude asm-core transitive dependency to protect of jar-hell.
exclude group: 'org.ow2.asm', module: 'asm'
}
+
+ testFixturesImplementation(project(":ignite-rest-api"))
+ testFixturesImplementation testFixtures(project(":ignite-core"))
+ testFixturesImplementation libs.hamcrest.core
+ testFixturesImplementation libs.micronaut.http.core
}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
new file mode 100644
index 0000000000..023e4c8dc0
--- /dev/null
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute;
+
+import static io.micronaut.http.HttpRequest.DELETE;
+import static io.micronaut.http.HttpRequest.PUT;
+import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
+import static
org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.canceled;
+import static
org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.completed;
+import static
org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.executing;
+import static
org.apache.ignite.internal.rest.matcher.RestJobStatusMatcher.queued;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.core.type.Argument;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.api.compute.JobStatus;
+import org.apache.ignite.internal.rest.api.compute.UpdateJobPriorityBody;
+import org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link ComputeController}.
+ */
+@MicronautTest
+public class ItComputeControllerTest extends ClusterPerClassIntegrationTest {
+ private static final String COMPUTE_URL = "/management/v1/compute/";
+
+ private static final Object LOCK = new Object();
+
+ @Inject
+ @Client("http://localhost:10300" + COMPUTE_URL)
+ HttpClient client0;
+
+ @Inject
+ @Client("http://localhost:10301" + COMPUTE_URL)
+ HttpClient client1;
+
+ @Inject
+ @Client("http://localhost:10303" + COMPUTE_URL)
+ HttpClient client2;
+
+ @Override
+ protected String getNodeBootstrapConfigTemplate() {
+ return "{\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " rest.port: {},\n"
+ + " compute.threadPoolSize: 1 \n"
+ + "}";
+ }
+
+ @AfterEach
+ void tearDown() {
+ // Cancel all jobs.
+ getJobStatuses(client0).values().stream()
+ .filter(it -> it.finishTime() == null)
+ .map(JobStatus::id)
+ .forEach(jobId -> cancelJob(client0, jobId));
+
+ // Wait for all jobs to complete.
+ await().until(() -> {
+ Collection<JobStatus> statuses = getJobStatuses(client0).values();
+
+ for (JobStatus status : statuses) {
+ if (status.finishTime() == null) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ @Test
+ void shouldReturnStatusesOfAllJobs() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> localExecution = runBlockingJob(entryNode,
Set.of(entryNode.node()));
+
+ JobExecution<String> remoteExecution = runBlockingJob(entryNode,
Set.of(CLUSTER.node(1).node()));
+
+ UUID localJobId = localExecution.idAsync().join();
+ UUID remoteJobId = remoteExecution.idAsync().join();
+
+ await().untilAsserted(() -> {
+ Map<UUID, JobStatus> statuses = getJobStatuses(client0);
+
+ assertThat(statuses.get(localJobId), executing(localJobId));
+ assertThat(statuses.get(remoteJobId), executing(remoteJobId));
+ });
+ }
+
+ @Test
+ void shouldReturnStatusOfLocalJob() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> execution = runBlockingJob(entryNode,
Set.of(entryNode.node()));
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ unblockJob();
+
+ await().until(() -> getJobStatus(client0, jobId), completed(jobId));
+ }
+
+ @Test
+ void shouldReturnStatusOfRemoteJob() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> execution = runBlockingJob(entryNode,
Set.of(CLUSTER.node(1).node()));
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ unblockJob();
+
+ await().until(() -> getJobStatus(client0, jobId), completed(jobId));
+ }
+
+ @Test
+ void shouldReturnProblemIfStatusOfNonExistingJob() {
+ UUID jobId = UUID.randomUUID();
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> getJobStatus(client0, jobId)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(404)
+
.withBody(isProblem().withStatus(404).withDetail("Compute job not found
[jobId=" + jobId + "]"), Problem.class)
+ );
+ }
+
+ @Test
+ void shouldCancelJobLocally() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> execution = runBlockingJob(entryNode,
Set.of(entryNode.node()));
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ cancelJob(client0, jobId);
+
+ await().until(() -> getJobStatus(client0, jobId), canceled(jobId,
true));
+ }
+
+ @Test
+ void shouldCancelJobRemotely() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> execution = runBlockingJob(entryNode,
Set.of(CLUSTER.node(1).node()));
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ cancelJob(client0, jobId);
+
+ await().until(() -> getJobStatus(client0, jobId), canceled(jobId,
true));
+ }
+
+ @Test
+ void shouldReturnProblemIfCancelNonExistingJob() {
+ UUID jobId = UUID.randomUUID();
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> cancelJob(client0, jobId)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(404)
+
.withBody(isProblem().withStatus(404).withDetail("Compute job not found
[jobId=" + jobId + "]"), Problem.class)
+ );
+ }
+
+ @Test
+ void shouldReturnFalseIfCancelCompletedJob() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ JobExecution<String> execution = runBlockingJob(entryNode,
Set.of(entryNode.node()));
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ unblockJob();
+
+ await().until(() -> getJobStatus(client0, jobId), completed(jobId));
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> cancelJob(client0, jobId)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(409)
+ .withBody(isProblem().withStatus(409)
+ .withDetail("Compute job is in illegal state
[jobId=" + jobId + ", state=COMPLETED]"), Problem.class)
+ );
+ }
+
+ @Test
+ void shouldUpdatePriorityLocally() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ Set<ClusterNode> nodes = Set.of(entryNode.node());
+
+ JobExecution<String> execution = runBlockingJob(entryNode, nodes);
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ JobExecution<String> execution2 = runBlockingJob(entryNode, nodes);
+
+ UUID jobId2 = execution2.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId2), queued(jobId2));
+
+ updatePriority(client0, jobId2, 1);
+ }
+
+ @Test
+ void shouldUpdatePriorityRemotely() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ Set<ClusterNode> nodes = Set.of(CLUSTER.node(1).node());
+
+ JobExecution<String> execution = runBlockingJob(entryNode, nodes);
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ JobExecution<String> execution2 = runBlockingJob(entryNode, nodes);
+
+ UUID jobId2 = execution2.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId2), queued(jobId2));
+
+ updatePriority(client0, jobId2, 1);
+ }
+
+ @Test
+ void shouldReturnProblemIfUpdatePriorityOfNonExistingJob() {
+ UUID jobId = UUID.randomUUID();
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> updatePriority(client0, jobId, 1)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(404)
+
.withBody(isProblem().withStatus(404).withDetail("Compute job not found
[jobId=" + jobId + "]"), Problem.class)
+ );
+ }
+
+ @Test
+ void shouldReturnFalseIfUpdatePriorityOfRunningJob() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ Set<ClusterNode> nodes = Set.of(entryNode.node());
+
+ JobExecution<String> execution = runBlockingJob(entryNode, nodes);
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> updatePriority(client0, jobId, 1)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(409)
+ .withBody(isProblem().withStatus(409)
+ .withDetail("Compute job is in illegal state
[jobId=" + jobId + ", state=EXECUTING]"), Problem.class)
+ );
+ }
+
+ @Test
+ void shouldReturnFalseIfUpdatePriorityOfCompletedJob() {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ Set<ClusterNode> nodes = Set.of(entryNode.node());
+
+ JobExecution<String> execution = runBlockingJob(entryNode, nodes);
+
+ UUID jobId = execution.idAsync().join();
+
+ await().until(() -> getJobStatus(client0, jobId), executing(jobId));
+
+ unblockJob();
+
+ await().until(() -> getJobStatus(client0, jobId), completed(jobId));
+
+ HttpClientResponseException httpClientResponseException = assertThrows(
+ HttpClientResponseException.class,
+ () -> updatePriority(client0, jobId, 1)
+ );
+
+ assertThat(
+ httpClientResponseException.getResponse(),
+ MicronautHttpResponseMatcher.<Problem>hasStatusCode(409)
+ .withBody(isProblem().withStatus(409)
+ .withDetail("Compute job is in illegal state
[jobId=" + jobId + ", state=COMPLETED]"), Problem.class)
+ );
+ }
+
+ private static JobExecution<String> runBlockingJob(IgniteImpl entryNode,
Set<ClusterNode> nodes) {
+ return entryNode.compute().executeAsync(nodes, List.of(),
BlockingJob.class.getName());
+ }
+
+ private static void unblockJob() {
+ synchronized (LOCK) {
+ LOCK.notifyAll();
+ }
+ }
+
+ private static Map<UUID, JobStatus> getJobStatuses(HttpClient client) {
+ List<JobStatus> statuses = client.toBlocking()
+ .retrieve(HttpRequest.GET("/jobs"),
Argument.listOf(JobStatus.class));
+
+ return statuses.stream().collect(Collectors.toMap(JobStatus::id, s ->
s));
+ }
+
+ private static JobStatus getJobStatus(HttpClient client, UUID jobId) {
+ return client.toBlocking().retrieve("/jobs/" + jobId, JobStatus.class);
+ }
+
+ private static void updatePriority(HttpClient client, UUID jobId, int
priority) {
+ client.toBlocking()
+ .exchange(PUT("/jobs/" + jobId + "/priority", new
UpdateJobPriorityBody(priority)));
+ }
+
+ private static void cancelJob(HttpClient client, UUID jobId) {
+ client.toBlocking().exchange(DELETE("/jobs/" + jobId));
+ }
+
+ private static class BlockingJob implements ComputeJob<String> {
+ /** {@inheritDoc} */
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ synchronized (LOCK) {
+ try {
+ LOCK.wait();
+ } catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+
+ return null;
+ }
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java
new file mode 100644
index 0000000000..0909f41a75
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeController.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import io.micronaut.http.annotation.Controller;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.compute.ComputeComponent;
+import org.apache.ignite.internal.rest.api.compute.ComputeApi;
+import org.apache.ignite.internal.rest.api.compute.JobState;
+import org.apache.ignite.internal.rest.api.compute.JobStatus;
+import org.apache.ignite.internal.rest.api.compute.UpdateJobPriorityBody;
+import
org.apache.ignite.internal.rest.compute.exception.ComputeJobNotFoundException;
+import
org.apache.ignite.internal.rest.compute.exception.ComputeJobStateException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * REST controller for compute operations.
+ */
+@Controller
+public class ComputeController implements ComputeApi {
+ private final ComputeComponent computeComponent;
+
+ public ComputeController(ComputeComponent computeComponent) {
+ this.computeComponent = computeComponent;
+ }
+
+ @Override
+ public CompletableFuture<Collection<JobStatus>> jobStatuses() {
+ return computeComponent.statusesAsync()
+ .thenApply(statuses ->
statuses.stream().map(ComputeController::toJobStatus).collect(toList()));
+ }
+
+ @Override
+ public CompletableFuture<JobStatus> jobStatus(UUID jobId) {
+ return jobStatus0(jobId);
+ }
+
+ @Override
+ public CompletableFuture<Void> updatePriority(UUID jobId,
UpdateJobPriorityBody updateJobPriorityBody) {
+ return computeComponent.changePriorityAsync(jobId,
updateJobPriorityBody.priority())
+ .thenCompose(result -> handleOperationResult(jobId, result));
+ }
+
+ @Override
+ public CompletableFuture<Void> cancelJob(UUID jobId) {
+ return computeComponent.cancelAsync(jobId)
+ .thenCompose(result -> handleOperationResult(jobId, result));
+ }
+
+ private CompletableFuture<Void> handleOperationResult(UUID jobId,
@Nullable Boolean result) {
+ if (result == null) {
+ return failedFuture(new
ComputeJobNotFoundException(jobId.toString()));
+ } else if (!result) {
+ return jobStatus0(jobId).thenCompose(status -> failedFuture(new
ComputeJobStateException(jobId.toString(), status.state())));
+ } else {
+ return nullCompletedFuture();
+ }
+ }
+
+ private CompletableFuture<JobStatus> jobStatus0(UUID jobId) {
+ return computeComponent.statusAsync(jobId)
+ .thenApply(status -> {
+ if (status == null) {
+ throw new
ComputeJobNotFoundException(jobId.toString());
+ } else {
+ return toJobStatus(status);
+ }
+ });
+ }
+
+ private static JobStatus toJobStatus(org.apache.ignite.compute.JobStatus
jobStatus) {
+ return new JobStatus(
+ jobStatus.id(),
+ JobState.valueOf(jobStatus.state().toString()),
+ jobStatus.createTime(),
+ jobStatus.startTime(),
+ jobStatus.finishTime()
+ );
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java
new file mode 100644
index 0000000000..97878eb356
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/ComputeRestFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute;
+
+import io.micronaut.context.annotation.Bean;
+import io.micronaut.context.annotation.Factory;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.compute.ComputeComponent;
+import org.apache.ignite.internal.rest.RestFactory;
+
+/**
+ * Factory that creates beans that are needed for {@link ComputeController}.
+ */
+@Factory
+public class ComputeRestFactory implements RestFactory {
+ private ComputeComponent computeComponent;
+
+ public ComputeRestFactory(ComputeComponent computeComponent) {
+ this.computeComponent = computeComponent;
+ }
+
+ @Bean
+ @Singleton
+ public ComputeComponent computeComponent() {
+ return computeComponent;
+ }
+
+ @Override
+ public void cleanResources() {
+ computeComponent = null;
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java
new file mode 100644
index 0000000000..d43cb47214
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobNotFoundException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute.exception;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Thrown when job not found.
+ */
+public class ComputeJobNotFoundException extends IgniteInternalException {
+ /**
+ * Constructor.
+ *
+ * @param jobId Job ID.
+ */
+ public ComputeJobNotFoundException(String jobId) {
+ super(ILLEGAL_ARGUMENT_ERR, "Compute job not found [jobId=" + jobId +
']');
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java
new file mode 100644
index 0000000000..2d0b1b009e
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/ComputeJobStateException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute.exception;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.ILLEGAL_ARGUMENT_ERR;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.rest.api.compute.JobState;
+
+/**
+ * Thrown when compute job is in illegal state.
+ */
+public class ComputeJobStateException extends IgniteInternalException {
+ /**
+ * Constructor.
+ *
+ * @param jobId Job ID.
+ */
+ public ComputeJobStateException(String jobId, JobState state) {
+ super(ILLEGAL_ARGUMENT_ERR, "Compute job is in illegal state [jobId="
+ jobId + ", state=" + state + ']');
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java
new file mode 100644
index 0000000000..c667e59b66
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobNotFoundExceptionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute.exception.handler;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.server.exceptions.ExceptionHandler;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.rest.api.Problem;
+import
org.apache.ignite.internal.rest.compute.exception.ComputeJobNotFoundException;
+import org.apache.ignite.internal.rest.constants.HttpCode;
+import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+
+/**
+ * REST exception handler for {@link ComputeJobNotFoundExceptionHandler}.
+ */
+@Singleton
+@Requires(classes = {ComputeJobNotFoundExceptionHandler.class,
ExceptionHandler.class})
+public class ComputeJobNotFoundExceptionHandler implements
+ ExceptionHandler<ComputeJobNotFoundException, HttpResponse<? extends
Problem>> {
+ @Override
+ public HttpResponse<? extends Problem> handle(HttpRequest request,
ComputeJobNotFoundException exception) {
+ return HttpProblemResponse.from(
+ Problem.fromHttpCode(HttpCode.NOT_FOUND)
+ .detail(exception.getMessage()).build()
+ );
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java
new file mode 100644
index 0000000000..f5e345c4e6
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/compute/exception/handler/ComputeJobStateExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.compute.exception.handler;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.server.exceptions.ExceptionHandler;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.rest.api.Problem;
+import
org.apache.ignite.internal.rest.compute.exception.ComputeJobStateException;
+import org.apache.ignite.internal.rest.constants.HttpCode;
+import org.apache.ignite.internal.rest.problem.HttpProblemResponse;
+
+/**
+ * REST exception handler for {@link ComputeJobStateException}.
+ */
+@Singleton
+@Requires(classes = {ComputeJobStateException.class, ExceptionHandler.class})
+public class ComputeJobStateExceptionHandler implements
ExceptionHandler<ComputeJobStateException, HttpResponse<? extends Problem>> {
+ @Override
+ public HttpResponse<? extends Problem> handle(HttpRequest request,
ComputeJobStateException exception) {
+ return HttpProblemResponse.from(
+ Problem.fromHttpCode(HttpCode.CONFLICT)
+ .detail(exception.getMessage()).build()
+ );
+ }
+}
diff --git
a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java
new file mode 100644
index 0000000000..d1163b55b7
--- /dev/null
+++
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/MicronautHttpResponseMatcher.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.matcher;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher for {@link HttpResponse}.
+ */
+public class MicronautHttpResponseMatcher<T> extends
TypeSafeMatcher<HttpResponse<?>> {
+ private final Matcher<Integer> statusCodeMatcher;
+
+ private Matcher<T> bodyMatcher;
+
+ private Class<T> body;
+
+ private MicronautHttpResponseMatcher(Matcher<Integer> statusCodeMatcher) {
+ this.statusCodeMatcher = statusCodeMatcher;
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link HttpResponse}
has a status that matches the specified status.
+ *
+ * @param status Expected status.
+ * @return Matcher.
+ */
+ public static <T> MicronautHttpResponseMatcher<T> hasStatus(HttpStatus
status) {
+ return new MicronautHttpResponseMatcher<>(is(status.getCode()));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link HttpResponse}
has a status code that matches the specified status code.
+ *
+ * @param statusCode Expected status code.
+ * @return Matcher.
+ */
+ public static <T> MicronautHttpResponseMatcher<T> hasStatusCode(int
statusCode) {
+ return new MicronautHttpResponseMatcher<>(is(statusCode));
+ }
+
+ /**
+ * Sets the expected body.
+ *
+ * @param body Body to match.
+ * @return Matcher.
+ */
+ public MicronautHttpResponseMatcher<T> withBody(T body) {
+ this.bodyMatcher = equalTo(body);
+ this.body = (Class<T>) body.getClass();
+ return this;
+ }
+
+ /**
+ * Sets the body matcher.
+ *
+ * @param bodyMatcher Body matcher.
+ * @param body Body class.
+ * @return Matcher.
+ */
+ public MicronautHttpResponseMatcher<T> withBody(Matcher<T> bodyMatcher,
Class<T> body) {
+ this.bodyMatcher = bodyMatcher;
+ this.body = body;
+ return this;
+ }
+
+ @Override
+ protected boolean matchesSafely(HttpResponse<?> httpResponse) {
+ if (!statusCodeMatcher.matches(httpResponse.code())) {
+ return false;
+ }
+
+ if (bodyMatcher != null &&
!bodyMatcher.matches(httpResponse.getBody(body).get())) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ if (statusCodeMatcher != null) {
+ description.appendText("an HttpResponse with status code matching
").appendDescriptionOf(statusCodeMatcher);
+ }
+
+ if (bodyMatcher != null) {
+ description.appendText(" and body
").appendDescriptionOf(bodyMatcher);
+ }
+ }
+
+ @Override
+ protected void describeMismatchSafely(HttpResponse<?> item, Description
mismatchDescription) {
+ mismatchDescription.appendText("status code was ")
+ .appendValue(item.code())
+ .appendText(" and body was ")
+ .appendValue(item.getBody(String.class));
+ }
+}
diff --git
a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java
new file mode 100644
index 0000000000..4faf8040db
--- /dev/null
+++
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/ProblemMatcher.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.matcher;
+
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.rest.api.InvalidParam;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.testframework.matchers.AnythingMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher for {@link Problem}.
+ */
+public class ProblemMatcher extends TypeSafeMatcher<Problem> {
+ private Matcher<String> titleMatcher = AnythingMatcher.anything();
+
+ private Matcher<Integer> statusMatcher = AnythingMatcher.anything();
+
+ private Matcher<String> codeMatcher = AnythingMatcher.anything();
+
+ private Matcher<String> typeMatcher = AnythingMatcher.anything();
+
+ private Matcher<String> detailMatcher = AnythingMatcher.anything();
+
+ private Matcher<String> nodeMatcher = AnythingMatcher.anything();
+
+ private Matcher<UUID> traceIdMatcher = AnythingMatcher.anything();
+
+ private Matcher<Collection<InvalidParam>> invalidParamsMatcher =
AnythingMatcher.anything();
+
+ /**
+ * Creates a matcher for {@link Problem}.
+ *
+ * @return Matcher.
+ */
+ public static ProblemMatcher isProblem() {
+ return new ProblemMatcher();
+ }
+
+ public ProblemMatcher withTitle(String title) {
+ return withTitle(equalTo(title));
+ }
+
+ public ProblemMatcher withTitle(Matcher<String> matcher) {
+ this.titleMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withStatus(Integer status) {
+ return withStatus(equalTo(status));
+ }
+
+ public ProblemMatcher withStatus(Matcher<Integer> matcher) {
+ this.statusMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withCode(String code) {
+ return withCode(equalTo(code));
+ }
+
+ public ProblemMatcher withCode(Matcher<String> matcher) {
+ this.codeMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withType(String type) {
+ return withType(equalTo(type));
+ }
+
+ public ProblemMatcher withType(Matcher<String> matcher) {
+ this.typeMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withDetail(String detail) {
+ return withDetail(equalTo(detail));
+ }
+
+ public ProblemMatcher withDetail(Matcher<String> matcher) {
+ this.detailMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withNode(String node) {
+ return withNode(equalTo(node));
+ }
+
+ public ProblemMatcher withNode(Matcher<String> matcher) {
+ this.nodeMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withTraceId(UUID traceId) {
+ return withTraceId(equalTo(traceId));
+ }
+
+ public ProblemMatcher withTraceId(Matcher<UUID> matcher) {
+ this.traceIdMatcher = matcher;
+ return this;
+ }
+
+ public ProblemMatcher withInvalidParams(Collection<InvalidParam>
invalidParams) {
+ return withInvalidParams(equalTo(invalidParams));
+ }
+
+ public ProblemMatcher withInvalidParams(Matcher<Collection<InvalidParam>>
matcher) {
+ this.invalidParamsMatcher = matcher;
+ return this;
+ }
+
+ @Override
+ protected boolean matchesSafely(Problem problem) {
+ return titleMatcher.matches(problem.title())
+ && statusMatcher.matches(problem.status())
+ && codeMatcher.matches(problem.code())
+ && typeMatcher.matches(problem.type())
+ && detailMatcher.matches(problem.detail())
+ && nodeMatcher.matches(problem.node())
+ && traceIdMatcher.matches(problem.traceId())
+ && invalidParamsMatcher.matches(problem.invalidParams());
+ }
+
+ @Override
+ protected void describeMismatchSafely(Problem item, Description
mismatchDescription) {
+ mismatchDescription.appendText("was a Problem with ")
+ .appendText("title: ").appendValue(item.title())
+ .appendText(", status: ").appendValue(item.status())
+ .appendText(", code: ").appendValue(item.code())
+ .appendText(", type: ").appendValue(item.type())
+ .appendText(", detail: ").appendValue(item.detail())
+ .appendText(", node: ").appendValue(item.node())
+ .appendText(", traceId: ").appendValue(item.traceId())
+ .appendText(", invalidParams:
").appendValue(item.invalidParams());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Problem with ")
+ .appendText("title ").appendDescriptionOf(titleMatcher)
+ .appendText(", status ").appendDescriptionOf(statusMatcher)
+ .appendText(", code ").appendDescriptionOf(codeMatcher)
+ .appendText(", type ").appendDescriptionOf(typeMatcher)
+ .appendText(", detail ").appendDescriptionOf(detailMatcher)
+ .appendText(", node ").appendDescriptionOf(nodeMatcher)
+ .appendText(", traceId ").appendDescriptionOf(traceIdMatcher)
+ .appendText(" and invalidParams
").appendDescriptionOf(invalidParamsMatcher);
+ }
+}
diff --git
a/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java
new file mode 100644
index 0000000000..1d044f2673
--- /dev/null
+++
b/modules/rest/src/testFixtures/java/org/apache/ignite/internal/rest/matcher/RestJobStatusMatcher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.matcher;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.time.Instant;
+import java.util.UUID;
+import org.apache.ignite.internal.rest.api.compute.JobState;
+import org.apache.ignite.internal.rest.api.compute.JobStatus;
+import org.apache.ignite.internal.testframework.matchers.AnythingMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher for {@link JobStatus}.
+ */
+public class RestJobStatusMatcher extends TypeSafeMatcher<JobStatus> {
+ private Matcher<JobState> stateMatcher = AnythingMatcher.anything();
+ private Matcher<UUID> idMatcher = AnythingMatcher.anything();
+ private Matcher<Instant> createTimeMatcher = AnythingMatcher.anything();
+ private Matcher<Instant> startTimeMatcher = AnythingMatcher.anything();
+ private Matcher<Instant> finishTimeMatcher = AnythingMatcher.anything();
+
+ public static RestJobStatusMatcher isJobStatus() {
+ return new RestJobStatusMatcher();
+ }
+
+ public static RestJobStatusMatcher queued(UUID id) {
+ return queued(equalTo(id));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#QUEUED}.
+ *
+ * @param idMatcher Id matcher.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher queued(Matcher<UUID> idMatcher) {
+ return isJobStatus().withId(idMatcher)
+ .withState(JobState.QUEUED)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(nullValue(Instant.class));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#EXECUTING}.
+ *
+ * @param id Id.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher executing(UUID id) {
+ return isJobStatus().withId(id)
+ .withState(JobState.EXECUTING)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(notNullValue(Instant.class))
+ .withFinishTime(nullValue(Instant.class));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#FAILED}.
+ *
+ * @param id Id.
+ * @param wasRunning Whether the job was running before it failed.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher failed(UUID id, boolean wasRunning) {
+ return isJobStatus().withId(id)
+ .withState(JobState.FAILED)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(wasRunning ? notNullValue(Instant.class) :
AnythingMatcher.anything())
+ .withFinishTime(notNullValue(Instant.class));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#COMPLETED}.
+ *
+ * @param id Id.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher completed(UUID id) {
+ return isJobStatus().withId(id)
+ .withState(JobState.COMPLETED)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(notNullValue(Instant.class))
+ .withFinishTime(notNullValue(Instant.class));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#CANCELING}.
+ *
+ * @param id Id.
+ * @param wasRunning Whether the job was running before it was canceled.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher canceling(UUID id, boolean wasRunning) {
+ return isJobStatus().withId(id)
+ .withState(JobState.CANCELING)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(wasRunning ? notNullValue(Instant.class) :
AnythingMatcher.anything())
+ .withFinishTime(notNullValue(Instant.class));
+ }
+
+ /**
+ * Creates a matcher that matches when the examined {@link JobStatus} has
a state of {@link JobState#CANCELED}.
+ *
+ * @param id Id.
+ * @param wasRunning Whether the job was running before it was canceled.
+ * @return Matcher.
+ */
+ public static RestJobStatusMatcher canceled(UUID id, boolean wasRunning) {
+ return isJobStatus().withId(id)
+ .withState(JobState.CANCELED)
+ .withCreateTime(notNullValue(Instant.class))
+ .withStartTime(wasRunning ? notNullValue(Instant.class) :
AnythingMatcher.anything())
+ .withFinishTime(notNullValue(Instant.class));
+ }
+
+
+ public RestJobStatusMatcher withState(JobState state) {
+ return withState(equalTo(state));
+ }
+
+ public RestJobStatusMatcher withState(Matcher<JobState> stateMatcher) {
+ this.stateMatcher = stateMatcher;
+ return this;
+ }
+
+
+ public RestJobStatusMatcher withId(UUID id) {
+ return withId(equalTo(id));
+ }
+
+ public RestJobStatusMatcher withId(Matcher<UUID> idMatcher) {
+ this.idMatcher = idMatcher;
+ return this;
+ }
+
+ public RestJobStatusMatcher withCreateTime(Instant createTime) {
+ return withCreateTime(equalTo(createTime));
+ }
+
+ public RestJobStatusMatcher withCreateTime(Matcher<Instant>
createTimeMatcher) {
+ this.createTimeMatcher = createTimeMatcher;
+ return this;
+ }
+
+ public RestJobStatusMatcher withStartTime(Instant startTime) {
+ return withStartTime(equalTo(startTime));
+ }
+
+ public RestJobStatusMatcher withStartTime(Matcher<Instant>
startTimeMatcher) {
+ this.startTimeMatcher = startTimeMatcher;
+ return this;
+ }
+
+ public RestJobStatusMatcher withFinishTime(Instant finishTime) {
+ return withFinishTime(equalTo(finishTime));
+ }
+
+ public RestJobStatusMatcher withFinishTime(Matcher<Instant>
finishTimeMatcher) {
+ this.finishTimeMatcher = finishTimeMatcher;
+ return this;
+ }
+
+ @Override
+ protected boolean matchesSafely(JobStatus status) {
+ return idMatcher.matches(status.id())
+ && stateMatcher.matches(status.state())
+ && createTimeMatcher.matches(status.createTime())
+ && startTimeMatcher.matches(status.startTime())
+ && finishTimeMatcher.matches(status.finishTime());
+ }
+
+ @Override
+ protected void describeMismatchSafely(JobStatus status, Description
mismatchDescription) {
+ mismatchDescription.appendText("was a JobStatus with id ")
+ .appendValue(status.id())
+ .appendText(", state ")
+ .appendValue(status.state())
+ .appendText(", create time ")
+ .appendValue(status.createTime())
+ .appendText(", start time ")
+ .appendValue(status.startTime())
+ .appendText(" and finish time ")
+ .appendValue(status.finishTime());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a JobStatus with id ")
+ .appendDescriptionOf(idMatcher)
+ .appendText(", state ")
+ .appendDescriptionOf(stateMatcher)
+ .appendText(", create time ")
+ .appendDescriptionOf(createTimeMatcher)
+ .appendText(", start time ")
+ .appendDescriptionOf(startTimeMatcher)
+ .appendText(" and finish time ")
+ .appendDescriptionOf(finishTimeMatcher);
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 683bc1bb74..d23bc0dce7 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -127,6 +127,7 @@ import org.apache.ignite.internal.rest.RestManager;
import org.apache.ignite.internal.rest.RestManagerFactory;
import
org.apache.ignite.internal.rest.authentication.AuthenticationProviderFactory;
import org.apache.ignite.internal.rest.cluster.ClusterManagementRestFactory;
+import org.apache.ignite.internal.rest.compute.ComputeRestFactory;
import org.apache.ignite.internal.rest.configuration.PresentationsFactory;
import org.apache.ignite.internal.rest.configuration.RestConfiguration;
import org.apache.ignite.internal.rest.deployment.CodeDeploymentRestFactory;
@@ -767,6 +768,8 @@ public class IgniteImpl implements Ignite {
Supplier<RestFactory> authProviderFactory = () -> new
AuthenticationProviderFactory(authenticationManager);
Supplier<RestFactory> deploymentCodeRestFactory = () -> new
CodeDeploymentRestFactory(deploymentManager);
Supplier<RestFactory> restManagerFactory = () -> new
RestManagerFactory(restManager);
+ Supplier<RestFactory> computeRestFactory = () -> new
ComputeRestFactory(computeComponent);
+
RestConfiguration restConfiguration =
nodeCfgMgr.configurationRegistry().getConfiguration(RestConfiguration.KEY);
return new RestComponent(
@@ -776,7 +779,9 @@ public class IgniteImpl implements Ignite {
nodeMetricRestFactory,
deploymentCodeRestFactory,
authProviderFactory,
- restManagerFactory),
+ restManagerFactory,
+ computeRestFactory
+ ),
restManager,
restConfiguration
);