This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a539eb2e84c [FLINK-31469] Allow setting JobResourceRequirements
through REST API.
a539eb2e84c is described below
commit a539eb2e84cccb21f046902026d4178d864fba7d
Author: David Moravek <[email protected]>
AuthorDate: Mon Feb 27 19:08:03 2023 +0100
[FLINK-31469] Allow setting JobResourceRequirements through REST API.
Signed-off-by: David Moravek <[email protected]>
---
.../shortcodes/generated/rest_v1_dispatcher.html | 126 +++++++++++++++++++++
docs/static/generated/rest_v1_dispatcher.yml | 54 +++++++++
.../src/test/resources/rest_api_v1.snapshot | 50 +++++++-
.../runtime/dispatcher/DispatcherRestEndpoint.java | 16 +++
.../job/JobResourceRequirementsHandler.java | 65 +++++++++++
.../job/JobResourceRequirementsUpdateHandler.java | 81 +++++++++++++
.../messages/job/JobResourceRequirementsBody.java | 89 +++++++++++++++
.../job/JobResourceRequirementsHeaders.java | 73 ++++++++++++
.../job/JobResourcesRequirementsUpdateHeaders.java | 78 +++++++++++++
.../job/JobResourceRequirementsBodyTest.java | 88 ++++++++++++++
10 files changed, 716 insertions(+), 4 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 922810b9c76..70ef8a1c5bc 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3086,6 +3086,132 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
</tr>
</tbody>
</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Request details on the job's resource requirements.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements",
+ "properties" : {
+ "parallelism" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism",
+ "properties" : {
+ "lowerBound" : {
+ "type" : "integer"
+ },
+ "upperBound" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>PUT</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Request to update job's resource requirements.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements",
+ "properties" : {
+ "parallelism" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism",
+ "properties" : {
+ "lowerBound" : {
+ "type" : "integer"
+ },
+ "upperBound" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index c8ece02b72a..71c00a0ddec 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -852,6 +852,42 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/AsynchronousOperationResult'
+ /jobs/{jobid}/resource-requirements:
+ get:
+ description: Request details on the job's resource requirements.
+ operationId: getJobResourceRequirements
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: '#/components/schemas/JobID'
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/JobResourceRequirementsBody'
+ put:
+ description: Request to update job's resource requirements.
+ operationId: updateJobResourceRequirements
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: '#/components/schemas/JobID'
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/JobResourceRequirementsBody'
+ responses:
+ "200":
+ description: The request was successful.
/jobs/{jobid}/savepoints:
post:
description: "Triggers a savepoint, and optionally cancels the job
afterwards.\
@@ -2344,6 +2380,10 @@ components:
properties:
plan:
$ref: '#/components/schemas/RawJson'
+ JobResourceRequirementsBody:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/JobVertexResourceRequirements'
JobResult:
type: object
properties:
@@ -2447,6 +2487,11 @@ components:
JobVertexID:
pattern: "[0-9a-f]{32}"
type: string
+ JobVertexResourceRequirements:
+ type: object
+ properties:
+ parallelism:
+ $ref: '#/components/schemas/Parallelism'
JobVertexTaskManagersInfo:
type: object
properties:
@@ -2530,6 +2575,15 @@ components:
value:
type: integer
format: int32
+ Parallelism:
+ type: object
+ properties:
+ lowerBound:
+ type: integer
+ format: int32
+ upperBound:
+ type: integer
+ format: int32
PendingCheckpointStatistics:
type: object
allOf:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index d4b2937e1c6..25d20a764b3 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2300,6 +2300,48 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/resource-requirements",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody"
+ }
+ }, {
+ "url" : "/jobs/:jobid/resource-requirements",
+ "method" : "PUT",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+ }
}, {
"url" : "/jobs/:jobid/savepoints",
"method" : "POST",
@@ -2782,14 +2824,14 @@
},
"response" : {
"type" : "object",
- "id" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph",
"properties" : {
"endTimestamp" : {
"type" : "integer"
},
"data" : {
"type" : "object",
- "id" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node",
"properties" : {
"name" : {
"type" : "string"
@@ -2801,7 +2843,7 @@
"type" : "array",
"items" : {
"type" : "object",
- "$ref" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node"
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node"
}
}
}
@@ -4063,4 +4105,4 @@
}
}
} ]
-}
+}
\ No newline at end of file
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e2414378702..fc2d24b0dd9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -26,6 +26,8 @@ import
org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import
org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsHandler;
+import
org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsUpdateHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -96,6 +98,20 @@ public class DispatcherRestEndpoint extends
WebMonitorEndpoint<DispatcherGateway
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(),
jobSubmitHandler));
+ final JobResourceRequirementsHandler jobResourceRequirementsHandler =
+ new JobResourceRequirementsHandler(leaderRetriever, timeout,
responseHeaders);
+ handlers.add(
+ Tuple2.of(
+ jobResourceRequirementsHandler.getMessageHeaders(),
+ jobResourceRequirementsHandler));
+
+ final JobResourceRequirementsUpdateHandler
jobResourceRequirementsUpdateHandler =
+ new JobResourceRequirementsUpdateHandler(leaderRetriever,
timeout, responseHeaders);
+ handlers.add(
+ Tuple2.of(
+
jobResourceRequirementsUpdateHandler.getMessageHeaders(),
+ jobResourceRequirementsUpdateHandler));
+
return handlers;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
new file mode 100644
index 00000000000..931e8039763
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler for reading current {@link
org.apache.flink.runtime.jobgraph.JobResourceRequirements
+ * resource requirements} of a given job.
+ */
+public class JobResourceRequirementsHandler
+ extends AbstractRestHandler<
+ DispatcherGateway,
+ EmptyRequestBody,
+ JobResourceRequirementsBody,
+ JobMessageParameters> {
+
+ public JobResourceRequirementsHandler(
+ GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders) {
+ super(leaderRetriever, timeout, responseHeaders,
JobResourceRequirementsHeaders.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<JobResourceRequirementsBody> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull
DispatcherGateway gateway)
+ throws RestHandlerException {
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ return gateway.requestJobResourceRequirements(jobId)
+ .thenApply(JobResourceRequirementsBody::new);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
new file mode 100644
index 00000000000..d762dd5baa2
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler for updating {@link
org.apache.flink.runtime.jobgraph.JobResourceRequirements
+ * resource requirements} of a given job.
+ */
+public class JobResourceRequirementsUpdateHandler
+ extends AbstractRestHandler<
+ DispatcherGateway,
+ JobResourceRequirementsBody,
+ EmptyResponseBody,
+ JobMessageParameters> {
+
+ public JobResourceRequirementsUpdateHandler(
+ GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobResourcesRequirementsUpdateHeaders.INSTANCE);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(
+ @Nonnull HandlerRequest<JobResourceRequirementsBody> request,
+ @Nonnull DispatcherGateway gateway)
+ throws RestHandlerException {
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ final Optional<JobResourceRequirements> maybeJobResourceRequirements =
+ request.getRequestBody().asJobResourceRequirements();
+ if (maybeJobResourceRequirements.isPresent()) {
+ return gateway.updateJobResourceRequirements(jobId,
maybeJobResourceRequirements.get())
+ .thenApply(ignored -> EmptyResponseBody.getInstance());
+ }
+ throw new RestHandlerException(
+ "Request body does not specify resource requirements.",
+ HttpResponseStatus.BAD_REQUEST);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
new file mode 100644
index 00000000000..37227a1bf77
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.annotation.docs.FlinkJsonSchema;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnyGetter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnySetter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Body for change job requests. */
[email protected](type = JobVertexResourceRequirements.class)
+public class JobResourceRequirementsBody implements RequestBody, ResponseBody {
+
+ @JsonAnySetter
+ @JsonAnyGetter
+ @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
+ private final Map<JobVertexID, JobVertexResourceRequirements>
jobVertexResourceRequirements;
+
+ public JobResourceRequirementsBody() {
+ this(null);
+ }
+
+ public JobResourceRequirementsBody(@Nullable JobResourceRequirements
jobResourceRequirements) {
+ if (jobResourceRequirements != null) {
+ this.jobVertexResourceRequirements =
jobResourceRequirements.getJobVertexParallelisms();
+ } else {
+ this.jobVertexResourceRequirements = new HashMap<>();
+ }
+ }
+
+ @JsonIgnore
+ public Optional<JobResourceRequirements> asJobResourceRequirements() {
+ if (jobVertexResourceRequirements.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(new
JobResourceRequirements(jobVertexResourceRequirements));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final JobResourceRequirementsBody that = (JobResourceRequirementsBody)
o;
+ return Objects.equals(jobVertexResourceRequirements,
that.jobVertexResourceRequirements);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobVertexResourceRequirements);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
new file mode 100644
index 00000000000..5bb17c87cc6
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for REST request to get details on job's resources. */
+public class JobResourceRequirementsHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, JobResourceRequirementsBody,
JobMessageParameters> {
+
+ public static final JobResourceRequirementsHeaders INSTANCE =
+ new JobResourceRequirementsHeaders();
+
+ private static final String URL = "/jobs/:" + JobIDPathParameter.KEY +
"/resource-requirements";
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Class<JobResourceRequirementsBody> getResponseClass() {
+ return JobResourceRequirementsBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Request details on the job's resource requirements.";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
new file mode 100644
index 00000000000..71056dba029
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for REST request to patch a job. */
+public class JobResourcesRequirementsUpdateHeaders
+ implements RuntimeMessageHeaders<
+ JobResourceRequirementsBody, EmptyResponseBody,
JobMessageParameters> {
+
+ public static final JobResourcesRequirementsUpdateHeaders INSTANCE =
+ new JobResourcesRequirementsUpdateHeaders();
+
+ private static final String URL = "/jobs/:" + JobIDPathParameter.KEY +
"/resource-requirements";
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.PUT;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Request to update job's resource requirements.";
+ }
+
+ @Override
+ public Class<JobResourceRequirementsBody> getRequestClass() {
+ return JobResourceRequirementsBody.class;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public String operationId() {
+ return "updateJobResourceRequirements";
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
new file mode 100644
index 00000000000..cfb2ac8d522
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Tests for the marshalling of {@link JobResourceRequirementsBody}. */
+public class JobResourceRequirementsBodyTest
+ extends RestRequestMarshallingTestBase<JobResourceRequirementsBody> {
+ @Override
+ protected Class<JobResourceRequirementsBody> getTestRequestClass() {
+ return JobResourceRequirementsBody.class;
+ }
+
+ @Override
+ protected JobResourceRequirementsBody getTestRequestInstance() {
+ return new JobResourceRequirementsBody(
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(new JobVertexID(), 1, 42)
+ .setParallelismForJobVertex(new JobVertexID(), 1, 1337)
+ .build());
+ }
+
+ @Override
+ protected void assertOriginalEqualsToUnmarshalled(
+ JobResourceRequirementsBody expected, JobResourceRequirementsBody
actual) {
+ assertThat(expected, equalsChangeJobRequestBody(actual));
+ }
+
+ private EqualityChangeJobRequestBodyMatcher equalsChangeJobRequestBody(
+ JobResourceRequirementsBody actual) {
+ return new EqualityChangeJobRequestBodyMatcher(actual);
+ }
+
+ private static final class EqualityChangeJobRequestBodyMatcher
+ extends TypeSafeMatcher<JobResourceRequirementsBody> {
+
+ private final JobResourceRequirementsBody
actualJobResourceRequirementsBody;
+
+ private EqualityChangeJobRequestBodyMatcher(
+ JobResourceRequirementsBody actualJobResourceRequirementsBody)
{
+ this.actualJobResourceRequirementsBody =
actualJobResourceRequirementsBody;
+ }
+
+ @Override
+ protected boolean matchesSafely(JobResourceRequirementsBody
jobResourceRequirementsBody) {
+ final Optional<JobResourceRequirements>
maybeActualJobResourceRequirements =
+
actualJobResourceRequirementsBody.asJobResourceRequirements();
+ final Optional<JobResourceRequirements>
maybeJobResourceRequirements =
+ jobResourceRequirementsBody.asJobResourceRequirements();
+ if (maybeActualJobResourceRequirements.isPresent()
+ ^ maybeJobResourceRequirements.isPresent()) {
+ return false;
+ }
+ return maybeActualJobResourceRequirements
+ .map(actual ->
actual.equals(maybeJobResourceRequirements.get()))
+ .orElse(true);
+ }
+
+ @Override
+ public void describeTo(Description description) {}
+ }
+}