This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 283904248a0 [FLINK-38897][Runtime/REST] Introduce the 
/jobs/:jobid/rescales/config endpoint in the REST API (#27580)
283904248a0 is described below

commit 283904248a03023bf3ed8a6fc9f6d575294a3969
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Mar 3 18:52:28 2026 +0800

    [FLINK-38897][Runtime/REST] Introduce the /jobs/:jobid/rescales/config 
endpoint in the REST API (#27580)
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |  75 ++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  51 +++++++
 .../src/test/resources/rest_api_v1.snapshot        |  51 +++++++
 .../job/rescales/JobRescaleConfigHandler.java      | 109 +++++++++++++++
 .../job/rescales/JobRescaleConfigHeaders.java      |  79 +++++++++++
 .../job/rescales/JobRescaleConfigInfo.java         | 153 +++++++++++++++++++++
 .../runtime/scheduler/ExecutionGraphInfo.java      |  18 ++-
 .../flink/runtime/scheduler/SchedulerBase.java     |   3 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  32 ++++-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 ++
 .../job/rescales/JobRescaleConfigHandlerTest.java  |  95 +++++++++++++
 11 files changed, 673 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4bcf44204db..2c14469d782 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3486,6 +3486,81 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/rescales/config</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the job rescale configuration.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+  "properties" : {
+    "executingCooldownTimeoutInMillis" : {
+      "type" : "integer"
+    },
+    "executingResourceStabilizationTimeoutInMillis" : {
+      "type" : "integer"
+    },
+    "maximumDelayForTriggeringRescaleInMillis" : {
+      "type" : "integer"
+    },
+    "rescaleHistoryMax" : {
+      "type" : "integer"
+    },
+    "rescaleOnFailedCheckpointCount" : {
+      "type" : "integer"
+    },
+    "schedulerExecutionMode" : {
+      "type" : "string",
+      "enum" : [ "REACTIVE" ]
+    },
+    "slotIdleTimeoutInMillis" : {
+      "type" : "integer"
+    },
+    "submissionResourceStabilizationTimeoutInMillis" : {
+      "type" : "integer"
+    },
+    "submissionResourceWaitTimeoutInMillis" : {
+      "type" : "integer"
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 33c6b292c19..3c0c3aae4ea 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -898,6 +898,24 @@ paths:
             application/json:
               schema:
                 $ref: "#/components/schemas/JobPlanInfo"
+  /jobs/{jobid}/rescales/config:
+    get:
+      description: Returns the job rescale configuration.
+      operationId: getJobRescaleConfig
+      parameters:
+        - name: jobid
+          in: path
+          description: 32-character hexadecimal string value that identifies a 
job.
+          required: true
+          schema:
+            $ref: "#/components/schemas/JobID"
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/JobRescaleConfigInfo"
   /jobs/{jobid}/rescaling:
     patch:
       description: Triggers the rescaling of a job. This async operation would 
return
@@ -2679,6 +2697,35 @@ components:
       properties:
         plan:
           $ref: "#/components/schemas/Plan"
+    JobRescaleConfigInfo:
+      type: object
+      properties:
+        executingCooldownTimeoutInMillis:
+          type: integer
+          format: int64
+        executingResourceStabilizationTimeoutInMillis:
+          type: integer
+          format: int64
+        maximumDelayForTriggeringRescaleInMillis:
+          type: integer
+          format: int64
+        rescaleHistoryMax:
+          type: integer
+          format: int32
+        rescaleOnFailedCheckpointCount:
+          type: integer
+          format: int32
+        schedulerExecutionMode:
+          $ref: "#/components/schemas/SchedulerExecutionMode"
+        slotIdleTimeoutInMillis:
+          type: integer
+          format: int64
+        submissionResourceStabilizationTimeoutInMillis:
+          type: integer
+          format: int64
+        submissionResourceWaitTimeoutInMillis:
+          type: integer
+          format: int64
     JobResourceRequirementsBody:
       type: object
       additionalProperties:
@@ -3071,6 +3118,10 @@ components:
         - Default
         - Adaptive
         - AdaptiveBatch
+    SchedulerExecutionMode:
+      type: string
+      enum:
+      - REACTIVE
     SerializedThrowable:
       type: object
       properties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 6a84ac6d888..38cafeea1b9 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4781,5 +4781,56 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/rescales/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+      "properties" : {
+        "executingCooldownTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "executingResourceStabilizationTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "maximumDelayForTriggeringRescaleInMillis" : {
+          "type" : "integer"
+        },
+        "rescaleHistoryMax" : {
+          "type" : "integer"
+        },
+        "rescaleOnFailedCheckpointCount" : {
+          "type" : "integer"
+        },
+        "schedulerExecutionMode" : {
+          "type" : "string",
+          "enum" : [ "REACTIVE" ]
+        },
+        "slotIdleTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "submissionResourceStabilizationTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "submissionResourceWaitTimeoutInMillis" : {
+          "type" : "integer"
+        }
+      }
+    }
   } ]
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
new file mode 100644
index 00000000000..fa4fab08df0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** The handler for job rescale configuration. */
+public class JobRescaleConfigHandler
+        extends AbstractExecutionGraphHandler<JobRescaleConfigInfo, 
JobMessageParameters>
+        implements JsonArchivist {
+
+    public JobRescaleConfigHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Duration timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobRescaleConfigInfo, 
JobMessageParameters>
+                    messageHeaders,
+            ExecutionGraphCache executionGraphCache,
+            Executor executor) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
+    }
+
+    @Override
+    protected JobRescaleConfigInfo handleRequest(
+            HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo 
executionGraphInfo)
+            throws RestHandlerException {
+        return getJobRescaleConfigInfo(executionGraphInfo);
+    }
+
+    private JobRescaleConfigInfo getJobRescaleConfigInfo(ExecutionGraphInfo 
executionGraphInfo)
+            throws RestHandlerException {
+        if (executionGraphInfo.getJobRescaleConfigInfo() == null) {
+            throw new RestHandlerException(
+                    "AdaptiveScheduler is not enabled for this job ("
+                            + executionGraphInfo.getJobId()
+                            + ").",
+                    HttpResponseStatus.NOT_FOUND,
+                    RestHandlerException.LoggingBehavior.IGNORE);
+        }
+        return executionGraphInfo.getJobRescaleConfigInfo();
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
+            throws IOException {
+
+        ResponseBody response;
+        try {
+            response = getJobRescaleConfigInfo(executionGraphInfo);
+        } catch (RestHandlerException rhe) {
+            response = new ErrorResponseBody(rhe.getMessage());
+        }
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobRescaleConfigHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + JobIDPathParameter.KEY,
+                                        
executionGraphInfo.getJobId().toString()),
+                        response));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
new file mode 100644
index 00000000000..fa97a493df9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigHeaders.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Header information related to rescale configuration for jobs with the 
adaptive scheduler enabled.
+ */
+public class JobRescaleConfigHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JobRescaleConfigInfo, JobMessageParameters> {
+
+    private static final JobRescaleConfigHeaders INSTANCE = new 
JobRescaleConfigHeaders();
+
+    public static final String JOB_RESCALE_CONFIG_PATH = 
"/jobs/:jobid/rescales/config";
+
+    private JobRescaleConfigHeaders() {}
+
+    @Override
+    public Class<JobRescaleConfigInfo> getResponseClass() {
+        return JobRescaleConfigInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the job rescale configuration.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOB_RESCALE_CONFIG_PATH;
+    }
+
+    public static JobRescaleConfigHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
new file mode 100644
index 00000000000..4ee4f0677be
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleConfigInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Configuration information related to rescaling for jobs with the adaptive 
scheduler enabled. */
+@Schema(name = "JobRescaleConfigInfo")
+public class JobRescaleConfigInfo implements ResponseBody, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_RESCALE_HISTORY_MAX = 
"rescaleHistoryMax";
+    public static final String FIELD_NAME_SCHEDULER_EXECUTION_MODE = 
"schedulerExecutionMode";
+    public static final String FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT =
+            "submissionResourceWaitTimeoutInMillis";
+    public static final String 
FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT =
+            "submissionResourceStabilizationTimeoutInMillis";
+    public static final String FIELD_NAME_SLOT_IDLE_TIMEOUT = 
"slotIdleTimeoutInMillis";
+    public static final String FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT =
+            "executingCooldownTimeoutInMillis";
+    public static final String 
FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT =
+            "executingResourceStabilizationTimeoutInMillis";
+    public static final String FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE 
=
+            "maximumDelayForTriggeringRescaleInMillis";
+    public static final String FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT =
+            "rescaleOnFailedCheckpointCount";
+
+    @JsonProperty(FIELD_NAME_RESCALE_HISTORY_MAX)
+    private final Integer rescaleHistoryMax;
+
+    @JsonProperty(FIELD_NAME_SCHEDULER_EXECUTION_MODE)
+    private final SchedulerExecutionMode schedulerExecutionMode;
+
+    @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT)
+    private final Long submissionResourceWaitTimeoutInMillis;
+
+    @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT)
+    private final Long submissionResourceStabilizationTimeoutInMillis;
+
+    @JsonProperty(FIELD_NAME_SLOT_IDLE_TIMEOUT)
+    private final Long slotIdleTimeoutInMillis;
+
+    @JsonProperty(FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT)
+    private final Long executingCooldownTimeoutInMillis;
+
+    @JsonProperty(FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT)
+    private final Long executingResourceStabilizationTimeoutInMillis;
+
+    @JsonProperty(FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE)
+    private final Long maximumDelayForTriggeringRescaleInMillis;
+
+    @JsonProperty(FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT)
+    private final Integer rescaleOnFailedCheckpointCount;
+
+    @JsonCreator
+    public JobRescaleConfigInfo(
+            @JsonProperty(FIELD_NAME_RESCALE_HISTORY_MAX) Integer 
rescaleHistoryMax,
+            @JsonProperty(FIELD_NAME_SCHEDULER_EXECUTION_MODE)
+                    SchedulerExecutionMode schedulerExecutionMode,
+            @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_WAIT_TIMEOUT)
+                    Long submissionResourceWaitTimeoutInMillis,
+            @JsonProperty(FIELD_NAME_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT)
+                    Long submissionResourceStabilizationTimeoutInMillis,
+            @JsonProperty(FIELD_NAME_SLOT_IDLE_TIMEOUT) Long 
slotIdleTimeoutInMillis,
+            @JsonProperty(FIELD_NAME_EXECUTING_COOLDOWN_TIMEOUT)
+                    Long executingCooldownTimeoutInMillis,
+            @JsonProperty(FIELD_NAME_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT)
+                    Long executingResourceStabilizationTimeoutInMillis,
+            @JsonProperty(FIELD_NAME_MAXIMUM_DELAY_FOR_TRIGGERING_RESCALE)
+                    Long maximumDelayForTriggeringRescaleInMillis,
+            @JsonProperty(FIELD_NAME_RESCALE_ON_FAILED_CHECKPOINT_COUNT)
+                    Integer rescaleOnFailedCheckpointCount) {
+        this.rescaleHistoryMax = rescaleHistoryMax;
+        this.schedulerExecutionMode = schedulerExecutionMode;
+        this.submissionResourceWaitTimeoutInMillis = 
submissionResourceWaitTimeoutInMillis;
+        this.submissionResourceStabilizationTimeoutInMillis =
+                submissionResourceStabilizationTimeoutInMillis;
+        this.slotIdleTimeoutInMillis = slotIdleTimeoutInMillis;
+        this.executingCooldownTimeoutInMillis = 
executingCooldownTimeoutInMillis;
+        this.executingResourceStabilizationTimeoutInMillis =
+                executingResourceStabilizationTimeoutInMillis;
+        this.maximumDelayForTriggeringRescaleInMillis = 
maximumDelayForTriggeringRescaleInMillis;
+        this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        JobRescaleConfigInfo that = (JobRescaleConfigInfo) o;
+        return Objects.equals(rescaleHistoryMax, that.rescaleHistoryMax)
+                && Objects.equals(schedulerExecutionMode, 
that.schedulerExecutionMode)
+                && Objects.equals(
+                        submissionResourceWaitTimeoutInMillis,
+                        that.submissionResourceWaitTimeoutInMillis)
+                && Objects.equals(
+                        submissionResourceStabilizationTimeoutInMillis,
+                        that.submissionResourceStabilizationTimeoutInMillis)
+                && Objects.equals(slotIdleTimeoutInMillis, 
that.slotIdleTimeoutInMillis)
+                && Objects.equals(
+                        executingCooldownTimeoutInMillis, 
that.executingCooldownTimeoutInMillis)
+                && Objects.equals(
+                        executingResourceStabilizationTimeoutInMillis,
+                        that.executingResourceStabilizationTimeoutInMillis)
+                && Objects.equals(
+                        maximumDelayForTriggeringRescaleInMillis,
+                        that.maximumDelayForTriggeringRescaleInMillis)
+                && Objects.equals(
+                        rescaleOnFailedCheckpointCount, 
that.rescaleOnFailedCheckpointCount);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                rescaleHistoryMax,
+                schedulerExecutionMode,
+                submissionResourceWaitTimeoutInMillis,
+                submissionResourceStabilizationTimeoutInMillis,
+                slotIdleTimeoutInMillis,
+                executingCooldownTimeoutInMillis,
+                executingResourceStabilizationTimeoutInMillis,
+                maximumDelayForTriggeringRescaleInMillis,
+                rescaleOnFailedCheckpointCount);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index 1280b625d0c..7e6680f498e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
 import javax.annotation.Nullable;
@@ -43,6 +44,12 @@ public class ExecutionGraphInfo implements Serializable {
     private final Iterable<RootExceptionHistoryEntry> exceptionHistory;
     @Nullable private final JobManagerOptions.SchedulerType schedulerType;
 
+    /**
+     * The value is null when the job is not enabled {@link
+     * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
+     */
+    @Nullable private final JobRescaleConfigInfo jobRescaleConfigInfo;
+
     public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
         this(
                 executionGraph,
@@ -56,16 +63,18 @@ public class ExecutionGraphInfo implements Serializable {
     public ExecutionGraphInfo(
             ArchivedExecutionGraph executionGraph,
             Iterable<RootExceptionHistoryEntry> exceptionHistory) {
-        this(executionGraph, exceptionHistory, null);
+        this(executionGraph, exceptionHistory, null, null);
     }
 
     public ExecutionGraphInfo(
             ArchivedExecutionGraph executionGraph,
             Iterable<RootExceptionHistoryEntry> exceptionHistory,
-            JobManagerOptions.SchedulerType schedulerType) {
+            @Nullable JobManagerOptions.SchedulerType schedulerType,
+            @Nullable JobRescaleConfigInfo jobRescaleConfigInfo) {
         this.executionGraph = executionGraph;
         this.exceptionHistory = exceptionHistory;
         this.schedulerType = schedulerType;
+        this.jobRescaleConfigInfo = jobRescaleConfigInfo;
     }
 
     public JobID getJobId() {
@@ -80,6 +89,11 @@ public class ExecutionGraphInfo implements Serializable {
         return exceptionHistory;
     }
 
+    @Nullable
+    public JobRescaleConfigInfo getJobRescaleConfigInfo() {
+        return jobRescaleConfigInfo;
+    }
+
     public Optional<ApplicationID> getApplicationId() {
         return executionGraph.getApplicationId();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1b50f86600a..50778dc9478 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -872,7 +872,8 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
         return new ExecutionGraphInfo(
                 ArchivedExecutionGraph.createFrom(executionGraph),
                 getExceptionHistory(),
-                getSchedulerType());
+                getSchedulerType(),
+                null);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 813b6132f0d..f89342d4b45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -94,6 +94,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.CoordinatorNotExistException;
 import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
@@ -304,7 +305,9 @@ public class AdaptiveScheduler
                     configuration.get(
                             SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
                             maximumDelayForRescaleTriggerDefault),
-                    rescaleOnFailedCheckpointsCount);
+                    rescaleOnFailedCheckpointsCount,
+                    // TODO: The parameter passing link will be implemented 
after FLIP-495.
+                    -1);
         }
 
         private final SchedulerExecutionMode executionMode;
@@ -315,6 +318,7 @@ public class AdaptiveScheduler
         private final Duration executingResourceStabilizationTimeout;
         private final Duration maximumDelayForTriggeringRescale;
         private final int rescaleOnFailedCheckpointCount;
+        private final int rescaleHistoryMax;
 
         private Settings(
                 SchedulerExecutionMode executionMode,
@@ -324,7 +328,8 @@ public class AdaptiveScheduler
                 Duration executingCooldownTimeout,
                 Duration executingResourceStabilizationTimeout,
                 Duration maximumDelayForTriggeringRescale,
-                int rescaleOnFailedCheckpointCount) {
+                int rescaleOnFailedCheckpointCount,
+                int rescaleHistoryMax) {
             this.executionMode = executionMode;
             this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
             this.submissionResourceStabilizationTimeout = 
submissionResourceStabilizationTimeout;
@@ -333,6 +338,7 @@ public class AdaptiveScheduler
             this.executingResourceStabilizationTimeout = 
executingResourceStabilizationTimeout;
             this.maximumDelayForTriggeringRescale = 
maximumDelayForTriggeringRescale;
             this.rescaleOnFailedCheckpointCount = 
rescaleOnFailedCheckpointCount;
+            this.rescaleHistoryMax = rescaleHistoryMax;
         }
 
         public SchedulerExecutionMode getExecutionMode() {
@@ -366,6 +372,23 @@ public class AdaptiveScheduler
         public int getRescaleOnFailedCheckpointCount() {
             return rescaleOnFailedCheckpointCount;
         }
+
+        public int getRescaleHistoryMax() {
+            return rescaleHistoryMax;
+        }
+
+        public JobRescaleConfigInfo toJobRescaleConfigInfo() {
+            return new JobRescaleConfigInfo(
+                    rescaleHistoryMax,
+                    executionMode,
+                    submissionResourceWaitTimeout.toMillis(),
+                    submissionResourceStabilizationTimeout.toMillis(),
+                    slotIdleTimeout.toMillis(),
+                    executingCooldownTimeout.toMillis(),
+                    executingResourceStabilizationTimeout.toMillis(),
+                    maximumDelayForTriggeringRescale.toMillis(),
+                    rescaleOnFailedCheckpointCount);
+        }
     }
 
     private final Settings settings;
@@ -850,7 +873,10 @@ public class AdaptiveScheduler
     @Override
     public ExecutionGraphInfo requestJob() {
         return new ExecutionGraphInfo(
-                state.getJob(), exceptionHistory.toArrayList(), 
getSchedulerType());
+                state.getJob(),
+                exceptionHistory.toArrayList(),
+                getSchedulerType(),
+                settings.toJobRescaleConfigInfo());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6c4fd91b3db..211b8749425 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -96,6 +96,7 @@ import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler
 import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -162,6 +163,7 @@ import 
org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHe
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1199,6 +1201,17 @@ public class WebMonitorEndpoint<T extends 
RestfulGateway> extends RestServerEndp
                         
jobResourceRequirementsUpdateHandler.getMessageHeaders(),
                         jobResourceRequirementsUpdateHandler));
 
+        final JobRescaleConfigHandler jobRescaleConfigHandler =
+                new JobRescaleConfigHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobRescaleConfigHeaders.getInstance(),
+                        executionGraphCache,
+                        executor);
+        handlers.add(
+                Tuple2.of(jobRescaleConfigHandler.getMessageHeaders(), 
jobRescaleConfigHandler));
+
         handlers.stream()
                 .map(tuple -> tuple.f1)
                 .filter(handler -> handler instanceof JsonArchivist)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
new file mode 100644
index 00000000000..3fb7c10f170
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescaleConfigHandler}. */
+class JobRescaleConfigHandlerTest {
+
+    @Test
+    void testGetJobRescaleConfigInfo() throws HandlerRequestException, 
RestHandlerException {
+        JobRescaleConfigHandler testInstance =
+                new JobRescaleConfigHandler(
+                        CompletableFuture::new,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        JobRescaleConfigHeaders.getInstance(),
+                        new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, 
TestingUtils.TIMEOUT),
+                        Executors.directExecutor());
+        final JobID jobId = new JobID();
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+        final HandlerRequest<EmptyRequestBody> request =
+                HandlerRequest.resolveParametersAndCreate(
+                        EmptyRequestBody.getInstance(),
+                        new JobMessageParameters(),
+                        pathParameters,
+                        Collections.emptyMap(),
+                        Collections.emptyList());
+
+        // Test for null on JobRescaleConfigInfo.
+        ExecutionGraphInfo executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(),
+                        Collections.emptyList(),
+                        null,
+                        null);
+        final ExecutionGraphInfo finalExecutionGraphInfo = executionGraphInfo;
+        assertThatThrownBy(() -> testInstance.handleRequest(request, 
finalExecutionGraphInfo))
+                .isInstanceOf(RestHandlerException.class);
+
+        // Test for nonnull on JobRescaleConfigInfo.
+        JobRescaleConfigInfo jobRescaleConfigInfo =
+                new JobRescaleConfigInfo(
+                        1, SchedulerExecutionMode.REACTIVE, 1L, 1L, 1L, 1L, 
1L, 1L, 1);
+        executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(),
+                        Collections.emptyList(),
+                        null,
+                        jobRescaleConfigInfo);
+        assertThat(testInstance.handleRequest(request, executionGraphInfo))
+                .isEqualTo(jobRescaleConfigInfo);
+    }
+}


Reply via email to