This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3a13f8dbb90 [FLINK-38978][runtime] Expose JM config for applications
3a13f8dbb90 is described below
commit 3a13f8dbb905e7780f81811b0a1e9f3eb698b03e
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Feb 24 12:15:49 2026 +0800
[FLINK-38978][runtime] Expose JM config for applications
---
.../src/test/resources/rest_api_v1.snapshot | 32 ++++++++
.../JobManagerApplicationConfigurationHandler.java | 85 ++++++++++++++++++++++
.../JobManagerApplicationConfigurationHeaders.java | 84 +++++++++++++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 14 ++++
...ManagerApplicationConfigurationHandlerTest.java | 80 ++++++++++++++++++++
5 files changed, 295 insertions(+)
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index b8d8f19359e..970bd84a287 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -161,6 +161,38 @@
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
}
+ }, {
+ "url" : "/applications/:applicationid/jobmanager/config",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "applicationid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+ "properties" : {
+ "key" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
}, {
"url" : "/cluster",
"method" : "DELETE",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
new file mode 100644
index 00000000000..3525d5870a7
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's configuration of a specific
application. */
+public class JobManagerApplicationConfigurationHandler
+ extends AbstractRestHandler<
+ RestfulGateway, EmptyRequestBody, ConfigurationInfo,
ApplicationMessageParameters>
+ implements ApplicationJsonArchivist {
+
+ private final ConfigurationInfo jobConfig;
+
+ public JobManagerApplicationConfigurationHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Duration timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, ConfigurationInfo,
ApplicationMessageParameters>
+ messageHeaders,
+ Configuration configuration) {
+ super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+ Preconditions.checkNotNull(configuration);
+ this.jobConfig = ConfigurationInfo.from(configuration);
+ }
+
+ @Override
+ protected CompletableFuture<ConfigurationInfo> handleRequest(
+ @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull
RestfulGateway gateway) {
+ return CompletableFuture.completedFuture(jobConfig);
+ }
+
+ @Override
+ public Collection<ArchivedJson> archiveApplicationWithPath(
+ ArchivedApplication archivedApplication) throws IOException {
+ return Collections.singletonList(
+ new ArchivedJson(
+ JobManagerApplicationConfigurationHeaders.getInstance()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + ApplicationIDPathParameter.KEY,
+
archivedApplication.getApplicationId().toHexString()),
+ jobConfig));
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
new file mode 100644
index 00000000000..48b426112c8
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.application;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link
+ *
org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler}.
+ */
+public class JobManagerApplicationConfigurationHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, ConfigurationInfo,
ApplicationMessageParameters> {
+
+ private static final JobManagerApplicationConfigurationHeaders INSTANCE =
+ new JobManagerApplicationConfigurationHeaders();
+
+ public static final String JOBMANAGER_APPLICATION_CONFIG_REST_PATH =
+ "/applications/:" + ApplicationIDPathParameter.KEY +
"/jobmanager/config";
+
+ private JobManagerApplicationConfigurationHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return JOBMANAGER_APPLICATION_CONFIG_REST_PATH;
+ }
+
+ @Override
+ public Class<ConfigurationInfo> getResponseClass() {
+ return ConfigurationInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public ApplicationMessageParameters getUnresolvedMessageParameters() {
+ return new ApplicationMessageParameters();
+ }
+
+ public static JobManagerApplicationConfigurationHeaders getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Returns the jobmanager's configuration of a specific
application.";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6e3926d6a2c..2ce3206d69a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -37,6 +37,7 @@ import
org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import
org.apache.flink.runtime.rest.handler.application.ApplicationCancellationHandler;
import
org.apache.flink.runtime.rest.handler.application.ApplicationDetailsHandler;
import
org.apache.flink.runtime.rest.handler.application.ApplicationsOverviewHandler;
+import
org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler;
import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
@@ -137,6 +138,7 @@ import
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders;
import
org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -506,6 +508,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
responseHeaders,
ApplicationDetailsHeaders.getInstance());
+ JobManagerApplicationConfigurationHandler
jobManagerApplicationConfigurationHandler =
+ new JobManagerApplicationConfigurationHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+
JobManagerApplicationConfigurationHeaders.getInstance(),
+ clusterConfiguration);
+
JobAccumulatorsHandler jobAccumulatorsHandler =
new JobAccumulatorsHandler(
leaderRetriever,
@@ -818,6 +828,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
handlers.add(
Tuple2.of(
applicationDetailsHandler.getMessageHeaders(),
applicationDetailsHandler));
+ handlers.add(
+ Tuple2.of(
+
jobManagerApplicationConfigurationHandler.getMessageHeaders(),
+ jobManagerApplicationConfigurationHandler));
handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(),
jobAccumulatorsHandler));
handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(),
taskManagersHandler));
handlers.add(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
new file mode 100644
index 00000000000..0205d5b9835
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.testutils.TestingUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link JobManagerApplicationConfigurationHandler}. */
+class JobManagerApplicationConfigurationHandlerTest {
+
+ private static HandlerRequest<EmptyRequestBody>
createRequest(ApplicationID applicationId)
+ throws HandlerRequestException {
+ Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(ApplicationIDPathParameter.KEY,
applicationId.toString());
+ return HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new ApplicationMessageParameters(),
+ pathParameters,
+ Collections.emptyMap(),
+ Collections.emptyList());
+ }
+
+ @Test
+ void testRequestConfiguration() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.ADDRESS, "address");
+
+ final JobManagerApplicationConfigurationHandler handler =
+ new JobManagerApplicationConfigurationHandler(
+ () -> null,
+ TestingUtils.TIMEOUT,
+ Collections.emptyMap(),
+
JobManagerApplicationConfigurationHeaders.getInstance(),
+ configuration);
+
+ final ApplicationID applicationId = ApplicationID.generate();
+ final HandlerRequest<EmptyRequestBody> handlerRequest =
createRequest(applicationId);
+
+ final ConfigurationInfo configurationInfo =
+ handler.handleRequest(handlerRequest, new
TestingRestfulGateway.Builder().build())
+ .get();
+
+
assertThat(configurationInfo.get(0).getKey()).isEqualTo(JobManagerOptions.ADDRESS.key());
+ assertThat(configurationInfo.get(0).getValue()).isEqualTo("address");
+ }
+}