This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 283904248a0 [FLINK-38897][Runtime/REST] Introduce the
/jobs/:jobid/rescales/config endpoint in the REST API (#27580)
283904248a0 is described below
commit 283904248a03023bf3ed8a6fc9f6d575294a3969
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Mar 3 18:52:28 2026 +0800
[FLINK-38897][Runtime/REST] Introduce the /jobs/:jobid/rescales/config
endpoint in the REST API (#27580)
---
.../shortcodes/generated/rest_v1_dispatcher.html | 75 ++++++++++
docs/static/generated/rest_v1_dispatcher.yml | 51 +++++++
.../src/test/resources/rest_api_v1.snapshot | 51 +++++++
.../job/rescales/JobRescaleConfigHandler.java | 109 +++++++++++++++
.../job/rescales/JobRescaleConfigHeaders.java | 79 +++++++++++
.../job/rescales/JobRescaleConfigInfo.java | 153 +++++++++++++++++++++
.../runtime/scheduler/ExecutionGraphInfo.java | 18 ++-
.../flink/runtime/scheduler/SchedulerBase.java | 3 +-
.../scheduler/adaptive/AdaptiveScheduler.java | 32 ++++-
.../runtime/webmonitor/WebMonitorEndpoint.java | 13 ++
.../job/rescales/JobRescaleConfigHandlerTest.java | 95 +++++++++++++
11 files changed, 673 insertions(+), 6 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4bcf44204db..2c14469d782 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3486,6 +3486,81 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
</tr>
</tbody>
</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/rescales/config</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Returns the job rescale configuration.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+ "properties" : {
+ "executingCooldownTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "executingResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "maximumDelayForTriggeringRescaleInMillis" : {
+ "type" : "integer"
+ },
+ "rescaleHistoryMax" : {
+ "type" : "integer"
+ },
+ "rescaleOnFailedCheckpointCount" : {
+ "type" : "integer"
+ },
+ "schedulerExecutionMode" : {
+ "type" : "string",
+ "enum" : [ "REACTIVE" ]
+ },
+ "slotIdleTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "submissionResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "submissionResourceWaitTimeoutInMillis" : {
+ "type" : "integer"
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 33c6b292c19..3c0c3aae4ea 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -898,6 +898,24 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/JobPlanInfo"
+ /jobs/{jobid}/rescales/config:
+ get:
+ description: Returns the job rescale configuration.
+ operationId: getJobRescaleConfig
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: "#/components/schemas/JobID"
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/JobRescaleConfigInfo"
/jobs/{jobid}/rescaling:
patch:
description: Triggers the rescaling of a job. This async operation would
return
@@ -2679,6 +2697,35 @@ components:
properties:
plan:
$ref: "#/components/schemas/Plan"
+ JobRescaleConfigInfo:
+ type: object
+ properties:
+ executingCooldownTimeoutInMillis:
+ type: integer
+ format: int64
+ executingResourceStabilizationTimeoutInMillis:
+ type: integer
+ format: int64
+ maximumDelayForTriggeringRescaleInMillis:
+ type: integer
+ format: int64
+ rescaleHistoryMax:
+ type: integer
+ format: int32
+ rescaleOnFailedCheckpointCount:
+ type: integer
+ format: int32
+ schedulerExecutionMode:
+ $ref: "#/components/schemas/SchedulerExecutionMode"
+ slotIdleTimeoutInMillis:
+ type: integer
+ format: int64
+ submissionResourceStabilizationTimeoutInMillis:
+ type: integer
+ format: int64
+ submissionResourceWaitTimeoutInMillis:
+ type: integer
+ format: int64
JobResourceRequirementsBody:
type: object
additionalProperties:
@@ -3071,6 +3118,10 @@ components:
- Default
- Adaptive
- AdaptiveBatch
+ SchedulerExecutionMode:
+ type: string
+ enum:
+ - REACTIVE
SerializedThrowable:
type: object
properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 6a84ac6d888..38cafeea1b9 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4781,5 +4781,56 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/rescales/config",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+ "properties" : {
+ "executingCooldownTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "executingResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "maximumDelayForTriggeringRescaleInMillis" : {
+ "type" : "integer"
+ },
+ "rescaleHistoryMax" : {
+ "type" : "integer"
+ },
+ "rescaleOnFailedCheckpointCount" : {
+ "type" : "integer"
+ },
+ "schedulerExecutionMode" : {
+ "type" : "string",
+ "enum" : [ "REACTIVE" ]
+ },
+ "slotIdleTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "submissionResourceStabilizationTimeoutInMillis" : {
+ "type" : "integer"
+ },
+ "submissionResourceWaitTimeoutInMillis" : {
+ "type" : "integer"
+ }
+ }
+ }
} ]
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
new file mode 100644
index 00000000000..fa4fab08df0
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** The handler for job rescale configuration. */
+public class JobRescaleConfigHandler
+ extends AbstractExecutionGraphHandler<JobRescaleConfigInfo,
JobMessageParameters>
+ implements JsonArchivist {
+
+ public JobRescaleConfigHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Duration timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobRescaleConfigInfo,
JobMessageParameters>
+ messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected JobRescaleConfigInfo handleRequest(
+ HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo
executionGraphInfo)
+ throws RestHandlerException {
+ return getJobRescaleConfigInfo(executionGraphInfo);
+ }
+
+ private JobRescaleConfigInfo getJobRescaleConfigInfo(ExecutionGraphInfo
executionGraphInfo)
+ throws RestHandlerException {
+ if (executionGraphInfo.getJobRescaleConfigInfo() == null) {
+ throw new RestHandlerException(
+ "AdaptiveScheduler is not enabled for this job ("
+ + executionGraphInfo.getJobId()
+ + ").",
+ HttpResponseStatus.NOT_FOUND,
+ RestHandlerException.LoggingBehavior.IGNORE);
+ }
+ return executionGraphInfo.getJobRescaleConfigInfo();
+ }
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
+ throws IOException {
+
+ ResponseBody response;
+ try {
+ response = getJobRescaleConfigInfo(executionGraphInfo);
+ } catch (RestHandlerException rhe) {
+ response = new ErrorResponseBody(rhe.getMessage());
+ }
+ return Collections.singletonList(
+ new ArchivedJson(
+ JobRescaleConfigHeaders.getInstance()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+
executionGraphInfo.getJobId().toString()),
+ response));
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
new file mode 100644
index 00000000000..fa97a493df9
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Header information related to rescale configuration for jobs with the
adaptive scheduler enabled.
+ */
+public class JobRescaleConfigHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, JobRescaleConfigInfo, JobMessageParameters> {
+
+ private static final JobRescaleConfigHeaders INSTANCE = new
JobRescaleConfigHeaders();
+
+ public static final String JOB_RESCALE_CONFIG_PATH =
"/jobs/:jobid/rescales/config";
+
+ private JobRescaleConfigHeaders() {}
+
+ @Override
+ public Class<JobRescaleConfigInfo> getResponseClass() {
+ return JobRescaleConfigInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Returns the job rescale configuration.";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return JOB_RESCALE_CONFIG_PATH;
+ }
+
+ public static JobRescaleConfigHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
new file mode 100644
index 00000000000..4ee4f0677be
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Configuration information related to rescaling for jobs with the adaptive
scheduler enabled. */
+@Schema(name = "JobRescaleConfigInfo")
+public class JobRescaleConfigInfo implements ResponseBody, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_RESCALE_HISTORY_MAX =
"rescaleHistoryMax";
+ public static final String FIELD_NAME_SCHEDULER_EXECUTION_MODE =
"schedulerExecutionMode";
+ public static final String FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT =
+ "submissionResourceWaitTimeoutInMillis";
+ public static final String
FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT =
+ "submissionResourceStabilizationTimeoutInMillis";
+ public static final String FIELD_NAME_SLOT_IDLE_TIMEOUT =
"slotIdleTimeoutInMillis";
+ public static final String FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT =
+ "executingCooldownTimeoutInMillis";
+ public static final String
FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT =
+ "executingResourceStabilizationTimeoutInMillis";
+ public static final String FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE
=
+ "maximumDelayForTriggeringRescaleInMillis";
+ public static final String FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT =
+ "rescaleOnFailedCheckpointCount";
+
+ @JsonProperty(FIELD_NAME_RESCALE_HISTORY_MAX)
+ private final Integer rescaleHistoryMax;
+
+ @JsonProperty(FIELD_NAME_SCHEDULER_EXECUTION_MODE)
+ private final SchedulerExecutionMode schedulerExecutionMode;
+
+ @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT)
+ private final Long submissionResourceWaitTimeoutInMillis;
+
+ @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT)
+ private final Long submissionResourceStabilizationTimeoutInMillis;
+
+ @JsonProperty(FIELD_NAME_SLOT_IDLE_TIMEOUT)
+ private final Long slotIdleTimeoutInMillis;
+
+ @JsonProperty(FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT)
+ private final Long executingCooldownTimeoutInMillis;
+
+ @JsonProperty(FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT)
+ private final Long executingResourceStabilizationTimeoutInMillis;
+
+ @JsonProperty(FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE)
+ private final Long maximumDelayForTriggeringRescaleInMillis;
+
+ @JsonProperty(FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT)
+ private final Integer rescaleOnFailedCheckpointCount;
+
+ @JsonCreator
+ public JobRescaleConfigInfo(
+ @JsonProperty(FIELD_NAME_RESCALE_HISTORY_MAX) Integer
rescaleHistoryMax,
+ @JsonProperty(FIELD_NAME_SCHEDULER_EXECUTION_MODE)
+ SchedulerExecutionMode schedulerExecutionMode,
+ @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT)
+ Long submissionResourceWaitTimeoutInMillis,
+ @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT)
+ Long submissionResourceStabilizationTimeoutInMillis,
+ @JsonProperty(FIELD_NAME_SLOT_IDLE_TIMEOUT) Long
slotIdleTimeoutInMillis,
+ @JsonProperty(FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT)
+ Long executingCooldownTimeoutInMillis,
+ @JsonProperty(FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT)
+ Long executingResourceStabilizationTimeoutInMillis,
+ @JsonProperty(FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE)
+ Long maximumDelayForTriggeringRescaleInMillis,
+ @JsonProperty(FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT)
+ Integer rescaleOnFailedCheckpointCount) {
+ this.rescaleHistoryMax = rescaleHistoryMax;
+ this.schedulerExecutionMode = schedulerExecutionMode;
+ this.submissionResourceWaitTimeoutInMillis =
submissionResourceWaitTimeoutInMillis;
+ this.submissionResourceStabilizationTimeoutInMillis =
+ submissionResourceStabilizationTimeoutInMillis;
+ this.slotIdleTimeoutInMillis = slotIdleTimeoutInMillis;
+ this.executingCooldownTimeoutInMillis =
executingCooldownTimeoutInMillis;
+ this.executingResourceStabilizationTimeoutInMillis =
+ executingResourceStabilizationTimeoutInMillis;
+ this.maximumDelayForTriggeringRescaleInMillis =
maximumDelayForTriggeringRescaleInMillis;
+ this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JobRescaleConfigInfo that = (JobRescaleConfigInfo) o;
+ return Objects.equals(rescaleHistoryMax, that.rescaleHistoryMax)
+ && Objects.equals(schedulerExecutionMode,
that.schedulerExecutionMode)
+ && Objects.equals(
+ submissionResourceWaitTimeoutInMillis,
+ that.submissionResourceWaitTimeoutInMillis)
+ && Objects.equals(
+ submissionResourceStabilizationTimeoutInMillis,
+ that.submissionResourceStabilizationTimeoutInMillis)
+ && Objects.equals(slotIdleTimeoutInMillis,
that.slotIdleTimeoutInMillis)
+ && Objects.equals(
+ executingCooldownTimeoutInMillis,
that.executingCooldownTimeoutInMillis)
+ && Objects.equals(
+ executingResourceStabilizationTimeoutInMillis,
+ that.executingResourceStabilizationTimeoutInMillis)
+ && Objects.equals(
+ maximumDelayForTriggeringRescaleInMillis,
+ that.maximumDelayForTriggeringRescaleInMillis)
+ && Objects.equals(
+ rescaleOnFailedCheckpointCount,
that.rescaleOnFailedCheckpointCount);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ rescaleHistoryMax,
+ schedulerExecutionMode,
+ submissionResourceWaitTimeoutInMillis,
+ submissionResourceStabilizationTimeoutInMillis,
+ slotIdleTimeoutInMillis,
+ executingCooldownTimeoutInMillis,
+ executingResourceStabilizationTimeoutInMillis,
+ maximumDelayForTriggeringRescaleInMillis,
+ rescaleOnFailedCheckpointCount);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index 1280b625d0c..7e6680f498e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import javax.annotation.Nullable;
@@ -43,6 +44,12 @@ public class ExecutionGraphInfo implements Serializable {
private final Iterable<RootExceptionHistoryEntry> exceptionHistory;
@Nullable private final JobManagerOptions.SchedulerType schedulerType;
+ /**
+ * The value is null when the job is not enabled {@link
+ * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
+ */
+ @Nullable private final JobRescaleConfigInfo jobRescaleConfigInfo;
+
public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
this(
executionGraph,
@@ -56,16 +63,18 @@ public class ExecutionGraphInfo implements Serializable {
public ExecutionGraphInfo(
ArchivedExecutionGraph executionGraph,
Iterable<RootExceptionHistoryEntry> exceptionHistory) {
- this(executionGraph, exceptionHistory, null);
+ this(executionGraph, exceptionHistory, null, null);
}
public ExecutionGraphInfo(
ArchivedExecutionGraph executionGraph,
Iterable<RootExceptionHistoryEntry> exceptionHistory,
- JobManagerOptions.SchedulerType schedulerType) {
+ @Nullable JobManagerOptions.SchedulerType schedulerType,
+ @Nullable JobRescaleConfigInfo jobRescaleConfigInfo) {
this.executionGraph = executionGraph;
this.exceptionHistory = exceptionHistory;
this.schedulerType = schedulerType;
+ this.jobRescaleConfigInfo = jobRescaleConfigInfo;
}
public JobID getJobId() {
@@ -80,6 +89,11 @@ public class ExecutionGraphInfo implements Serializable {
return exceptionHistory;
}
+ @Nullable
+ public JobRescaleConfigInfo getJobRescaleConfigInfo() {
+ return jobRescaleConfigInfo;
+ }
+
public Optional<ApplicationID> getApplicationId() {
return executionGraph.getApplicationId();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1b50f86600a..50778dc9478 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -872,7 +872,8 @@ public abstract class SchedulerBase implements SchedulerNG,
CheckpointScheduling
return new ExecutionGraphInfo(
ArchivedExecutionGraph.createFrom(executionGraph),
getExceptionHistory(),
- getSchedulerType());
+ getSchedulerType(),
+ null);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 813b6132f0d..f89342d4b45 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -94,6 +94,7 @@ import
org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.CoordinatorNotExistException;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
@@ -304,7 +305,9 @@ public class AdaptiveScheduler
configuration.get(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
- rescaleOnFailedCheckpointsCount);
+ rescaleOnFailedCheckpointsCount,
+ // TODO: The parameter passing link will be implemented
after FLIP-495.
+ -1);
}
private final SchedulerExecutionMode executionMode;
@@ -315,6 +318,7 @@ public class AdaptiveScheduler
private final Duration executingResourceStabilizationTimeout;
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
+ private final int rescaleHistoryMax;
private Settings(
SchedulerExecutionMode executionMode,
@@ -324,7 +328,8 @@ public class AdaptiveScheduler
Duration executingCooldownTimeout,
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
- int rescaleOnFailedCheckpointCount) {
+ int rescaleOnFailedCheckpointCount,
+ int rescaleHistoryMax) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout =
submissionResourceStabilizationTimeout;
@@ -333,6 +338,7 @@ public class AdaptiveScheduler
this.executingResourceStabilizationTimeout =
executingResourceStabilizationTimeout;
this.maximumDelayForTriggeringRescale =
maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount =
rescaleOnFailedCheckpointCount;
+ this.rescaleHistoryMax = rescaleHistoryMax;
}
public SchedulerExecutionMode getExecutionMode() {
@@ -366,6 +372,23 @@ public class AdaptiveScheduler
public int getRescaleOnFailedCheckpointCount() {
return rescaleOnFailedCheckpointCount;
}
+
+ public int getRescaleHistoryMax() {
+ return rescaleHistoryMax;
+ }
+
+ public JobRescaleConfigInfo toJobRescaleConfigInfo() {
+ return new JobRescaleConfigInfo(
+ rescaleHistoryMax,
+ executionMode,
+ submissionResourceWaitTimeout.toMillis(),
+ submissionResourceStabilizationTimeout.toMillis(),
+ slotIdleTimeout.toMillis(),
+ executingCooldownTimeout.toMillis(),
+ executingResourceStabilizationTimeout.toMillis(),
+ maximumDelayForTriggeringRescale.toMillis(),
+ rescaleOnFailedCheckpointCount);
+ }
}
private final Settings settings;
@@ -850,7 +873,10 @@ public class AdaptiveScheduler
@Override
public ExecutionGraphInfo requestJob() {
return new ExecutionGraphInfo(
- state.getJob(), exceptionHistory.toArrayList(),
getSchedulerType());
+ state.getJob(),
+ exceptionHistory.toArrayList(),
+ getSchedulerType(),
+ settings.toJobRescaleConfigInfo());
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6c4fd91b3db..211b8749425 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -96,6 +96,7 @@ import
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler
import
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
import
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -162,6 +163,7 @@ import
org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHe
import
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
import
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1199,6 +1201,17 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
jobResourceRequirementsUpdateHandler.getMessageHeaders(),
jobResourceRequirementsUpdateHandler));
+ final JobRescaleConfigHandler jobRescaleConfigHandler =
+ new JobRescaleConfigHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobRescaleConfigHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+ handlers.add(
+ Tuple2.of(jobRescaleConfigHandler.getMessageHeaders(),
jobRescaleConfigHandler));
+
handlers.stream()
.map(tuple -> tuple.f1)
.filter(handler -> handler instanceof JsonArchivist)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
new file mode 100644
index 00000000000..3fb7c10f170
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescaleConfigHandler}. */
+class JobRescaleConfigHandlerTest {
+
+ @Test
+ void testGetJobRescaleConfigInfo() throws HandlerRequestException,
RestHandlerException {
+ JobRescaleConfigHandler testInstance =
+ new JobRescaleConfigHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT,
+ Collections.emptyMap(),
+ JobRescaleConfigHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT,
TestingUtils.TIMEOUT),
+ Executors.directExecutor());
+ final JobID jobId = new JobID();
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+ final HandlerRequest<EmptyRequestBody> request =
+ HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ pathParameters,
+ Collections.emptyMap(),
+ Collections.emptyList());
+
+ // Test for null on JobRescaleConfigInfo.
+ ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ Collections.emptyList(),
+ null,
+ null);
+ final ExecutionGraphInfo finalExecutionGraphInfo = executionGraphInfo;
+ assertThatThrownBy(() -> testInstance.handleRequest(request,
finalExecutionGraphInfo))
+ .isInstanceOf(RestHandlerException.class);
+
+ // Test for nonnull on JobRescaleConfigInfo.
+ JobRescaleConfigInfo jobRescaleConfigInfo =
+ new JobRescaleConfigInfo(
+ 1, SchedulerExecutionMode.REACTIVE, 1L, 1L, 1L, 1L,
1L, 1L, 1);
+ executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ Collections.emptyList(),
+ null,
+ jobRescaleConfigInfo);
+ assertThat(testInstance.handleRequest(request, executionGraphInfo))
+ .isEqualTo(jobRescaleConfigInfo);
+ }
+}