Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4757#discussion_r142689803
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
---
@@ -18,38 +18,95 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntry;
+import
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricEntryList;
+import
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricIdList;
+import
org.apache.flink.runtime.rest.handler.legacy.messages.JobMetricsOverview;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobIDQueryParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobMetricsHeaders;
+
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/**
* Request handler that returns for a given job a list of all available
metrics or the values for a set of metrics.
*
* <p>If the query parameters do not contain a "get" parameter the list of
all metrics is returned.
* {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
*
- * <p>If the query parameters do contain a "get" parameter a
comma-separate list of metric names is expected as a value.
- * {@code /get?X,Y}
+ * <p>If the query parameters do contain a "get" parameter, a
comma-separate list of metric names is expected as a value.
+ * {@code ?get=X,Y}
* The handler will then return a list containing the values of the
requested metrics.
* {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ]
}
*/
-public class JobMetricsHandler extends AbstractMetricsHandler {
- public static final String PARAMETER_JOB_ID = "jobid";
- private static final String JOB_METRICS_REST_PATH =
"/jobs/:jobid/metrics";
+public class JobMetricsHandler extends AbstractMetricsHandler
+ implements LegacyRestHandler<DispatcherGateway,
JobMetricsOverview, JobMessageParameters> {
+
+ private final MetricStore metricStore;
public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
super(executor, fetcher);
+ metricStore = fetcher.getMetricStore();
}
@Override
public String[] getPaths() {
- return new String[]{JOB_METRICS_REST_PATH};
+ return new String[]{JobMetricsHeaders.JOB_METRICS_REST_PATH};
}
@Override
protected Map<String, String> getMapFor(Map<String, String> pathParams,
MetricStore metrics) {
- MetricStore.JobMetricStore job =
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
- return job != null
- ? job.metrics
- : null;
+ MetricStore.JobMetricStore job =
metrics.getJobMetricStore(pathParams.get(JobMetricsHeaders.PARAMETER_JOB_ID));
+ return job != null ? job.metrics : null;
+ }
+
+ protected Map<String, String> getMetricsMapByJobId(JobID jobID,
MetricStore metrics) {
+ MetricStore.JobMetricStore job =
metrics.getJobMetricStore(jobID);
+ return job != null ? job.metrics : null;
+ }
+
+ @Override
+ public CompletableFuture<JobMetricsOverview>
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
DispatcherGateway gateway) {
--- End diff --
we can keep this logic in the AbstractMetricsHandler after having it
implement the LegacyHandlerAdapter interface
---