This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a13f8dbb90 [FLINK-38978][runtime] Expose JM config for applications
3a13f8dbb90 is described below

commit 3a13f8dbb905e7780f81811b0a1e9f3eb698b03e
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Feb 24 12:15:49 2026 +0800

    [FLINK-38978][runtime] Expose JM config for applications
---
 .../src/test/resources/rest_api_v1.snapshot        | 32 ++++++++
 .../JobManagerApplicationConfigurationHandler.java | 85 ++++++++++++++++++++++
 .../JobManagerApplicationConfigurationHeaders.java | 84 +++++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 14 ++++
 ...ManagerApplicationConfigurationHandlerTest.java | 80 ++++++++++++++++++++
 5 files changed, 295 insertions(+)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index b8d8f19359e..970bd84a287 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -161,6 +161,38 @@
       "type" : "object",
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
     }
+  }, {
+    "url" : "/applications/:applicationid/jobmanager/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "applicationid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/cluster",
     "method" : "DELETE",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
new file mode 100644
index 00000000000..3525d5870a7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's configuration of a specific 
application. */
+public class JobManagerApplicationConfigurationHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, ConfigurationInfo, 
ApplicationMessageParameters>
+        implements ApplicationJsonArchivist {
+
+    private final ConfigurationInfo jobConfig;
+
+    public JobManagerApplicationConfigurationHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Duration timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, ConfigurationInfo, 
ApplicationMessageParameters>
+                    messageHeaders,
+            Configuration configuration) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        Preconditions.checkNotNull(configuration);
+        this.jobConfig = ConfigurationInfo.from(configuration);
+    }
+
+    @Override
+    protected CompletableFuture<ConfigurationInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway) {
+        return CompletableFuture.completedFuture(jobConfig);
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveApplicationWithPath(
+            ArchivedApplication archivedApplication) throws IOException {
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobManagerApplicationConfigurationHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + ApplicationIDPathParameter.KEY,
+                                        
archivedApplication.getApplicationId().toHexString()),
+                        jobConfig));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
new file mode 100644
index 00000000000..48b426112c8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/JobManagerApplicationConfigurationHeaders.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.application;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link
+ * 
org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler}.
+ */
+public class JobManagerApplicationConfigurationHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, ConfigurationInfo, 
ApplicationMessageParameters> {
+
+    private static final JobManagerApplicationConfigurationHeaders INSTANCE =
+            new JobManagerApplicationConfigurationHeaders();
+
+    public static final String JOBMANAGER_APPLICATION_CONFIG_REST_PATH =
+            "/applications/:" + ApplicationIDPathParameter.KEY + 
"/jobmanager/config";
+
+    private JobManagerApplicationConfigurationHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOBMANAGER_APPLICATION_CONFIG_REST_PATH;
+    }
+
+    @Override
+    public Class<ConfigurationInfo> getResponseClass() {
+        return ConfigurationInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public ApplicationMessageParameters getUnresolvedMessageParameters() {
+        return new ApplicationMessageParameters();
+    }
+
+    public static JobManagerApplicationConfigurationHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the jobmanager's configuration of a specific 
application.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6e3926d6a2c..2ce3206d69a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationCancellationHandler;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.application.ApplicationsOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
@@ -137,6 +138,7 @@ import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders;
 import 
org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -506,6 +508,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         responseHeaders,
                         ApplicationDetailsHeaders.getInstance());
 
+        JobManagerApplicationConfigurationHandler 
jobManagerApplicationConfigurationHandler =
+                new JobManagerApplicationConfigurationHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        
JobManagerApplicationConfigurationHeaders.getInstance(),
+                        clusterConfiguration);
+
         JobAccumulatorsHandler jobAccumulatorsHandler =
                 new JobAccumulatorsHandler(
                         leaderRetriever,
@@ -818,6 +828,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
         handlers.add(
                 Tuple2.of(
                         applicationDetailsHandler.getMessageHeaders(), 
applicationDetailsHandler));
+        handlers.add(
+                Tuple2.of(
+                        
jobManagerApplicationConfigurationHandler.getMessageHeaders(),
+                        jobManagerApplicationConfigurationHandler));
         handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(), 
jobAccumulatorsHandler));
         handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(), 
taskManagersHandler));
         handlers.add(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
new file mode 100644
index 00000000000..0205d5b9835
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/JobManagerApplicationConfigurationHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.testutils.TestingUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link JobManagerApplicationConfigurationHandler}. */
+class JobManagerApplicationConfigurationHandlerTest {
+
+    private static HandlerRequest<EmptyRequestBody> 
createRequest(ApplicationID applicationId)
+            throws HandlerRequestException {
+        Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(ApplicationIDPathParameter.KEY, 
applicationId.toString());
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new ApplicationMessageParameters(),
+                pathParameters,
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+
+    @Test
+    void testRequestConfiguration() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.ADDRESS, "address");
+
+        final JobManagerApplicationConfigurationHandler handler =
+                new JobManagerApplicationConfigurationHandler(
+                        () -> null,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        
JobManagerApplicationConfigurationHeaders.getInstance(),
+                        configuration);
+
+        final ApplicationID applicationId = ApplicationID.generate();
+        final HandlerRequest<EmptyRequestBody> handlerRequest = 
createRequest(applicationId);
+
+        final ConfigurationInfo configurationInfo =
+                handler.handleRequest(handlerRequest, new 
TestingRestfulGateway.Builder().build())
+                        .get();
+
+        
assertThat(configurationInfo.get(0).getKey()).isEqualTo(JobManagerOptions.ADDRESS.key());
+        assertThat(configurationInfo.get(0).getValue()).isEqualTo("address");
+    }
+}

Reply via email to