[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 This closes #5805.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0410d80 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0410d80 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0410d80 Branch: refs/heads/master Commit: c0410d801e406e77b1e6e7134224f7946906a49f Parents: 4645d3c Author: zentol <ches...@apache.org> Authored: Wed Mar 28 12:52:07 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Apr 16 21:17:54 2018 +0200 ---------------------------------------------------------------------- .../AbstractAggregatingMetricsHandler.java | 300 ++++++++++++++ .../metrics/AggregatingJobsMetricsHandler.java | 77 ++++ .../AggregatingSubtasksMetricsHandler.java | 119 ++++++ .../AggregatingTaskManagersMetricsHandler.java | 77 ++++ .../handler/job/metrics/DoubleAccumulator.java | 257 ++++++++++++ .../AbstractAggregatedMetricsHeaders.java | 50 +++ .../AbstractAggregatedMetricsParameters.java | 48 +++ .../AggregateTaskManagerMetricsParameters.java | 38 ++ .../metrics/AggregatedJobMetricsHeaders.java | 44 +++ .../metrics/AggregatedJobMetricsParameters.java | 39 ++ .../messages/job/metrics/AggregatedMetric.java | 118 ++++++ .../metrics/AggregatedMetricsResponseBody.java | 112 ++++++ .../AggregatedSubtaskMetricsHeaders.java | 47 +++ .../AggregatedSubtaskMetricsParameters.java | 51 +++ .../AggregatedTaskManagerMetricsHeaders.java | 44 +++ .../job/metrics/JobsFilterQueryParameter.java | 48 +++ .../metrics/MetricsAggregationParameter.java | 58 +++ .../metrics/SubtasksFilterQueryParameter.java | 41 ++ .../TaskManagersFilterQueryParameter.java | 42 ++ .../runtime/webmonitor/WebMonitorEndpoint.java | 33 ++ .../AggregatingJobsMetricsHandlerTest.java | 81 ++++ .../AggregatingMetricsHandlerTestBase.java | 389 +++++++++++++++++++ .../AggregatingSubtasksMetricsHandlerTest.java | 93 +++++ ...gregatingTaskManagersMetricsHandlerTest.java | 82 ++++ 24 files changed, 2288 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java new file mode 100644 index 0000000..338bb46 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * <p>If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * <p>If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * <p>The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler<P extends AbstractAggregatedMetricsParameters<?>> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AggregatedMetricsResponseBody, P> { + + private final Executor executor; + private final MetricFetcher<?> fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + AbstractAggregatedMetricsHeaders<P> messageHeaders, + Executor executor, + MetricFetcher<?> fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.executor = Preconditions.checkNotNull(executor); + this.fetcher = Preconditions.checkNotNull(fetcher); + } + + @Nonnull + abstract Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, P> request); + + @Override + protected CompletableFuture<AggregatedMetricsResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, P> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + return CompletableFuture.supplyAsync( + () -> { + try { + fetcher.update(); + List<String> requestedMetrics = request.getQueryParameter(MetricsFilterParameter.class); + List<MetricsAggregationParameter.AggregationMode> requestedAggregations = request.getQueryParameter(MetricsAggregationParameter.class); + MetricStore store = fetcher.getMetricStore(); + + Collection<? extends MetricStore.ComponentMetricStore> stores = getStores(store, request); + + if (requestedMetrics.isEmpty()) { + Collection<String> list = getAvailableMetrics(stores); + return new AggregatedMetricsResponseBody( + list.stream() + .map(AggregatedMetric::new) + .collect(Collectors.toList()) + ); + } + + DoubleAccumulator.DoubleMinimumFactory minimumFactory = null; + DoubleAccumulator.DoubleMaximumFactory maximumFactory = null; + DoubleAccumulator.DoubleAverageFactory averageFactory = null; + DoubleAccumulator.DoubleSumFactory sumFactory = null; + // by default we return all aggregations + if (requestedAggregations.isEmpty()) { + minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get(); + maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get(); + averageFactory = DoubleAccumulator.DoubleAverageFactory.get(); + sumFactory = DoubleAccumulator.DoubleSumFactory.get(); + } else { + for (MetricsAggregationParameter.AggregationMode aggregation : requestedAggregations) { + switch (aggregation) { + case MIN: + minimumFactory = DoubleAccumulator.DoubleMinimumFactory.get(); + break; + case MAX: + maximumFactory = DoubleAccumulator.DoubleMaximumFactory.get(); + break; + case AVG: + averageFactory = DoubleAccumulator.DoubleAverageFactory.get(); + break; + case SUM: + sumFactory = DoubleAccumulator.DoubleSumFactory.get(); + break; + default: + log.warn("Unsupported aggregation specified: {}", aggregation); + } + } + } + MetricAccumulatorFactory metricAccumulatorFactory = new MetricAccumulatorFactory(minimumFactory, maximumFactory, averageFactory, sumFactory); + + return getAggregatedMetricValues(stores, requestedMetrics, metricAccumulatorFactory); + } catch (Exception e) { + log.warn("Could not retrieve metrics.", e); + throw new CompletionException(new RestHandlerException("Could not retrieve metrics.", HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + }, + executor); + } + + /** + * Returns a JSON string containing a list of all available metrics in the given stores. Effectively this method maps + * the union of all key-sets to JSON. + * + * @param stores metrics + * @return JSON string containing a list of all available metrics + */ + private static Collection<String> getAvailableMetrics(Collection<? extends MetricStore.ComponentMetricStore> stores) { + Set<String> uniqueMetrics = new HashSet<>(32); + for (MetricStore.ComponentMetricStore store : stores) { + uniqueMetrics.addAll(store.metrics.keySet()); + } + return uniqueMetrics; + } + + /** + * Extracts and aggregates all requested metrics from the given metric stores, and maps the result to a JSON string. + * + * @param stores available metrics + * @param requestedMetrics ids of requested metrics + * @param requestedAggregationsFactories requested aggregations + * @return JSON string containing the requested metrics + */ + private AggregatedMetricsResponseBody getAggregatedMetricValues( + Collection<? extends MetricStore.ComponentMetricStore> stores, + List<String> requestedMetrics, + MetricAccumulatorFactory requestedAggregationsFactories) { + + Collection<AggregatedMetric> aggregatedMetrics = new ArrayList<>(requestedMetrics.size()); + for (String requestedMetric : requestedMetrics) { + final Collection<Double> values = new ArrayList<>(stores.size()); + try { + for (MetricStore.ComponentMetricStore store : stores) { + String stringValue = store.metrics.get(requestedMetric); + if (stringValue != null) { + values.add(Double.valueOf(stringValue)); + } + } + } catch (NumberFormatException nfe) { + log.warn("The metric {} is not numeric and can't be aggregated.", requestedMetric, nfe); + // metric is not numeric so we can't perform aggregations => ignore it + continue; + } + if (!values.isEmpty()) { + + Iterator<Double> valuesIterator = values.iterator(); + MetricAccumulator acc = requestedAggregationsFactories.get(requestedMetric, valuesIterator.next()); + valuesIterator.forEachRemaining(acc::add); + + aggregatedMetrics.add(acc.get()); + } else { + return new AggregatedMetricsResponseBody(Collections.emptyList()); + } + } + return new AggregatedMetricsResponseBody(aggregatedMetrics); + } + + private static class MetricAccumulatorFactory { + + @Nullable + private final DoubleAccumulator.DoubleMinimumFactory minimumFactory; + + @Nullable + private final DoubleAccumulator.DoubleMaximumFactory maximumFactory; + + @Nullable + private final DoubleAccumulator.DoubleAverageFactory averageFactory; + + @Nullable + private final DoubleAccumulator.DoubleSumFactory sumFactory; + + private MetricAccumulatorFactory( + @Nullable DoubleAccumulator.DoubleMinimumFactory minimumFactory, + @Nullable DoubleAccumulator.DoubleMaximumFactory maximumFactory, + @Nullable DoubleAccumulator.DoubleAverageFactory averageFactory, + @Nullable DoubleAccumulator.DoubleSumFactory sumFactory) { + this.minimumFactory = minimumFactory; + this.maximumFactory = maximumFactory; + this.averageFactory = averageFactory; + this.sumFactory = sumFactory; + } + + MetricAccumulator get(String metricName, double init) { + return new MetricAccumulator( + metricName, + minimumFactory == null ? null : minimumFactory.get(init), + maximumFactory == null ? null : maximumFactory.get(init), + averageFactory == null ? null : averageFactory.get(init), + sumFactory == null ? null : sumFactory.get(init) + ); + } + } + + private static class MetricAccumulator { + private final String metricName; + + @Nullable + private final DoubleAccumulator min; + @Nullable + private final DoubleAccumulator max; + @Nullable + private final DoubleAccumulator avg; + @Nullable + private final DoubleAccumulator sum; + + private MetricAccumulator( + String metricName, + @Nullable DoubleAccumulator min, + @Nullable DoubleAccumulator max, + @Nullable DoubleAccumulator avg, + @Nullable DoubleAccumulator sum) { + this.metricName = Preconditions.checkNotNull(metricName); + this.min = min; + this.max = max; + this.avg = avg; + this.sum = sum; + } + + void add(double value) { + if (min != null) { + min.add(value); + } + if (max != null) { + max.add(value); + } + if (avg != null) { + avg.add(value); + } + if (sum != null) { + sum.add(value); + } + } + + AggregatedMetric get() { + return new AggregatedMetric( + metricName, + min == null ? null : min.getValue(), + max == null ? null : max.getValue(), + avg == null ? null : avg.getValue(), + sum == null ? null : sum.getValue() + ); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java new file mode 100644 index 0000000..42928a4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.JobsFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across jobs, a list of all available metrics or the values + * for a set of metrics. + * + * <p>Specific jobs can be selected for aggregation by specifying a comma-separated list of job IDs. + * {@code /metrics?get=X,Y&jobs=A,B} + */ +public class AggregatingJobsMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedJobMetricsParameters> { + + public AggregatingJobsMetricsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, Map<String, String> responseHeaders, + Executor executor, + MetricFetcher<?> fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedJobMetricsHeaders.getInstance(), executor, fetcher); + } + + @Nonnull + @Override + Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedJobMetricsParameters> request) { + List<JobID> jobs = request.getQueryParameter(JobsFilterQueryParameter.class); + if (jobs.isEmpty()) { + return store.getJobs().values(); + } else { + Collection<MetricStore.ComponentMetricStore> jobStores = new ArrayList<>(jobs.size()); + for (JobID job : jobs) { + MetricStore.ComponentMetricStore jobMetricStore = store.getJobMetricStore(job.toString()); + if (jobMetricStore != null) { + jobStores.add(jobMetricStore); + } + } + return jobStores; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java new file mode 100644 index 0000000..f95deaa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.IntStream; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * <p>Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler<AggregatedSubtaskMetricsParameters> { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + Executor executor, + MetricFetcher<?> fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Nonnull + @Override + Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregatedSubtaskMetricsParameters> request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection<String> subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + MetricStore.TaskMetricStore taskMetricStore = store.getTaskMetricStore(jobID.toString(), taskID.toString()); + if (taskMetricStore != null) { + return taskMetricStore.getAllSubtaskMetricStores(); + } else { + return Collections.emptyList(); + } + } else { + Iterable<Integer> subtasks = getIntegerRangeFromString(subtaskRanges); + Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + MetricStore.ComponentMetricStore subtaskMetricStore = store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask); + if (subtaskMetricStore != null) { + subtaskStores.add(subtaskMetricStore); + } + } + return subtaskStores; + } + } + + private Iterable<Integer> getIntegerRangeFromString(Collection<String> ranges) { + UnionIterator<Integer> iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator<Integer> rangeIterator; + String range = rawRange.trim(); + int dashIdx = range.indexOf('-'); + if (dashIdx == -1) { + // only one value in range: + rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator(); + } else { + // evaluate range + final int start = Integer.valueOf(range.substring(0, dashIdx)); + final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length())); + rangeIterator = IntStream.rangeClosed(start, end).iterator(); + } + iterators.add(rangeIterator); + } catch (NumberFormatException nfe) { + log.warn("Invalid value {} specified for integer range. Not a number.", rawRange, nfe); + } + } + + return iterators; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java new file mode 100644 index 0000000..2e15cac --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingTaskManagersMetricsHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagersFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across task managers, a list of all available metrics or the values for + * a set of metrics. + * + * <p>Specific taskmanagers can be selected for aggregation by specifying a comma-separated list of taskmanager IDs. + * {@code /metrics?get=X,Y&taskmanagers=A,B} + */ +public class AggregatingTaskManagersMetricsHandler extends AbstractAggregatingMetricsHandler<AggregateTaskManagerMetricsParameters> { + + public AggregatingTaskManagersMetricsHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, Map<String, String> responseHeaders, + Executor executor, + MetricFetcher<?> fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedTaskManagerMetricsHeaders.getInstance(), executor, fetcher); + } + + @Nonnull + @Override + Collection<? extends MetricStore.ComponentMetricStore> getStores(MetricStore store, HandlerRequest<EmptyRequestBody, AggregateTaskManagerMetricsParameters> request) { + List<ResourceID> taskmanagers = request.getQueryParameter(TaskManagersFilterQueryParameter.class); + if (taskmanagers.isEmpty()) { + return store.getTaskManagers().values(); + } else { + Collection<MetricStore.TaskManagerMetricStore> taskmanagerStores = new ArrayList<>(taskmanagers.size()); + for (ResourceID taskmanager : taskmanagers) { + MetricStore.TaskManagerMetricStore taskManagerMetricStore = store.getTaskManagerMetricStore(taskmanager.getResourceIdString()); + if (taskManagerMetricStore != null) { + taskmanagerStores.add(taskManagerMetricStore); + } + } + return taskmanagerStores; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java new file mode 100644 index 0000000..dc701d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { + + /** + * Adds the given value to this accumulator. + * + * @param value value to add + */ + void add(double value); + + /** + * Returns the current value of this accumulator. + * + * @return current value of this accumulator + */ + double getValue(); + + /** + * Returns the name of this accumulator type. This name is used as a suffix for exposed metrics. + * + * @return name of this accumulator type + */ + String getName(); + + /** + * A factory for {@link DoubleAccumulator}s. This allows us to regenerate a new set of accumulators for each metrics + * without re-evaluating the "agg" query parameter or re-using existing accumulators. + * + * @param <A> DoubleAccumulator subclass + */ + interface DoubleAccumulatorFactory<A extends DoubleAccumulator> { + /** + * Creates a new accumulator with the given initial value. + * + * @param init initial value + * @return new accumulator with the given initial value + */ + A get(double init); + } + + /** + * Factory for {@link DoubleMaximum}. + */ + final class DoubleMaximumFactory implements DoubleAccumulatorFactory<DoubleMaximum> { + private static final DoubleMaximumFactory INSTANCE = new DoubleMaximumFactory(); + + private DoubleMaximumFactory(){ + } + + @Override + public DoubleMaximum get(double init) { + return new DoubleMaximum(init); + } + + public static DoubleMaximumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleMinimum}. + */ + final class DoubleMinimumFactory implements DoubleAccumulatorFactory<DoubleMinimum> { + private static final DoubleMinimumFactory INSTANCE = new DoubleMinimumFactory(); + + private DoubleMinimumFactory(){ + } + + @Override + public DoubleMinimum get(double init) { + return new DoubleMinimum(init); + } + + public static DoubleMinimumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleSum}. + */ + final class DoubleSumFactory implements DoubleAccumulatorFactory<DoubleSum> { + private static final DoubleSumFactory INSTANCE = new DoubleSumFactory(); + + private DoubleSumFactory(){ + } + + @Override + public DoubleSum get(double init) { + return new DoubleSum(init); + } + + public static DoubleSumFactory get() { + return INSTANCE; + } + } + + /** + * Factory for {@link DoubleAverage}. + */ + final class DoubleAverageFactory implements DoubleAccumulatorFactory<DoubleAverage> { + private static final DoubleAverageFactory INSTANCE = new DoubleAverageFactory(); + + private DoubleAverageFactory(){ + } + + @Override + public DoubleAverage get(double init) { + return new DoubleAverage(init); + } + + public static DoubleAverageFactory get() { + return INSTANCE; + } + } + + /** + * {@link DoubleAccumulator} that returns the maximum value. + */ + final class DoubleMaximum implements DoubleAccumulator { + + public static final String NAME = "max"; + + private double value; + + private DoubleMaximum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value = Math.max(this.value, value); + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the minimum value. + */ + final class DoubleMinimum implements DoubleAccumulator { + + public static final String NAME = "min"; + + private double value; + + private DoubleMinimum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value = Math.min(this.value, value); + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the sum of all values. + */ + final class DoubleSum implements DoubleAccumulator { + + public static final String NAME = "sum"; + + private double value; + + private DoubleSum(double init) { + value = init; + } + + @Override + public void add(double value) { + this.value += value; + } + + @Override + public double getValue() { + return value; + } + + @Override + public String getName() { + return NAME; + } + } + + /** + * {@link DoubleAccumulator} that returns the average over all values. + */ + final class DoubleAverage implements DoubleAccumulator { + + public static final String NAME = "avg"; + + private double sum; + private int count; + + private DoubleAverage(double init) { + sum = init; + count = 1; + } + + @Override + public void add(double value) { + this.sum += value; + this.count++; + } + + @Override + public double getValue() { + return sum / count; + } + + @Override + public String getName() { + return NAME; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java new file mode 100644 index 0000000..4100802 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Based {@link MessageHeaders} class for aggregating metrics. + */ +public abstract class AbstractAggregatedMetricsHeaders<P extends AbstractAggregatedMetricsParameters<?>> implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> { + @Override + public Class<AggregatedMetricsResponseBody> getResponseClass() { + return AggregatedMetricsResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java new file mode 100644 index 0000000..a07141d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsParameters.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Base {@link MessageParameters} class for aggregating metrics. + */ +public abstract class AbstractAggregatedMetricsParameters<M extends MessageQueryParameter<?>> extends MessageParameters { + private final MetricsFilterParameter metrics = new MetricsFilterParameter(); + private final MetricsAggregationParameter aggs = new MetricsAggregationParameter(); + private final M selector; + + AbstractAggregatedMetricsParameters(M selector) { + this.selector = selector; + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.unmodifiableCollection(Arrays.asList( + metrics, + aggs, + selector + )); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java new file mode 100644 index 0000000..0a053e6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregateTaskManagerMetricsParameters.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for aggregating task manager metrics. + */ +public class AggregateTaskManagerMetricsParameters extends AbstractAggregatedMetricsParameters<TaskManagersFilterQueryParameter> { + public AggregateTaskManagerMetricsParameters() { + super(new TaskManagersFilterQueryParameter()); + } + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java new file mode 100644 index 0000000..265512e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsHeaders.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +/** + * Headers for aggregating job metrics. + */ +public class AggregatedJobMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedJobMetricsParameters> { + + private static final AggregatedJobMetricsHeaders INSTANCE = new AggregatedJobMetricsHeaders(); + + private AggregatedJobMetricsHeaders() { + } + + @Override + public String getTargetRestEndpointURL() { + return "/jobs/metrics"; + } + + @Override + public AggregatedJobMetricsParameters getUnresolvedMessageParameters() { + return new AggregatedJobMetricsParameters(); + } + + public static AggregatedJobMetricsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java new file mode 100644 index 0000000..25df609 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedJobMetricsParameters.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for aggregating job metrics. + */ +public class AggregatedJobMetricsParameters extends AbstractAggregatedMetricsParameters<JobsFilterQueryParameter> { + + public AggregatedJobMetricsParameters() { + super(new JobsFilterQueryParameter()); + } + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java new file mode 100644 index 0000000..acafc3a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetric.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; + +/** + * Response type for aggregated metrics. Contains the metric name and optionally the sum, average, minimum and maximum. + */ +public class AggregatedMetric { + + private static final String FIELD_NAME_ID = "id"; + + private static final String FIELD_NAME_MIN = "min"; + + private static final String FIELD_NAME_MAX = "max"; + + private static final String FIELD_NAME_AVG = "avg"; + + private static final String FIELD_NAME_SUM = "sum"; + + @JsonProperty(value = FIELD_NAME_ID, required = true) + private final String id; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_NAME_MIN) + private final Double min; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_NAME_MAX) + private final Double max; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_NAME_AVG) + private final Double avg; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_NAME_SUM) + private final Double sum; + + @JsonCreator + public AggregatedMetric( + final @JsonProperty(value = FIELD_NAME_ID, required = true) String id, + final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min, + final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max, + final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg, + final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) { + + this.id = requireNonNull(id, "id must not be null"); + this.min = min; + this.max = max; + this.avg = avg; + this.sum = sum; + } + + public AggregatedMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) { + this(id, null, null, null, null); + } + + @JsonIgnore + public String getId() { + return id; + } + + @JsonIgnore + public Double getMin() { + return min; + } + + @JsonIgnore + public Double getMax() { + return max; + } + + @JsonIgnore + public Double getSum() { + return sum; + } + + @JsonIgnore + public Double getAvg() { + return avg; + } + + @Override + public String toString() { + return "AggregatedMetric{" + + "id='" + id + '\'' + + ", mim='" + min + '\'' + + ", max='" + max + '\'' + + ", avg='" + avg + '\'' + + ", sum='" + sum + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java new file mode 100644 index 0000000..b6b8dcc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedMetricsResponseBody.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * Response type for a collection of aggregated metrics. + * + * <p>As JSON this type will be represented as an array of + * metrics, i.e., the field <code>metrics</code> will not show up. For example, a collection with a + * single metric will be represented as follows: + * <pre> + * {@code + * [{"id": "metricName", "min": "1"}] + * } + * </pre> + * + * @see AggregatedMetricsResponseBody.Serializer + * @see AggregatedMetricsResponseBody.Deserializer + * @see org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore + */ +@JsonSerialize(using = AggregatedMetricsResponseBody.Serializer.class) +@JsonDeserialize(using = AggregatedMetricsResponseBody.Deserializer.class) +public class AggregatedMetricsResponseBody implements ResponseBody { + + private final Collection<AggregatedMetric> metrics; + + public AggregatedMetricsResponseBody(Collection<AggregatedMetric> metrics) { + + this.metrics = metrics; + } + + @JsonIgnore + public Collection<AggregatedMetric> getMetrics() { + return metrics; + } + + /** + * JSON serializer for {@link AggregatedMetricsResponseBody}. + */ + public static class Serializer extends StdSerializer<AggregatedMetricsResponseBody> { + + private static final long serialVersionUID = 1L; + + protected Serializer() { + super(AggregatedMetricsResponseBody.class); + } + + @Override + public void serialize( + AggregatedMetricsResponseBody metricCollectionResponseBody, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + + jsonGenerator.writeObject(metricCollectionResponseBody.getMetrics()); + } + } + + /** + * JSON deserializer for {@link AggregatedMetricsResponseBody}. + */ + public static class Deserializer extends StdDeserializer<AggregatedMetricsResponseBody> { + + private static final long serialVersionUID = 1L; + + protected Deserializer() { + super(AggregatedMetricsResponseBody.class); + } + + @Override + public AggregatedMetricsResponseBody deserialize( + JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException { + + return new AggregatedMetricsResponseBody(jsonParser.readValueAs( + new TypeReference<List<AggregatedMetric>>() { + })); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java new file mode 100644 index 0000000..e1d0790 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; + +/** + * Headers for aggregating subtask metrics. + */ +public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregatedSubtaskMetricsParameters> { + + private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders(); + + private AggregatedSubtaskMetricsHeaders() { + } + + @Override + public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() { + return new AggregatedSubtaskMetricsParameters(); + } + + @Override + public String getTargetRestEndpointURL() { + return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + JobVertexIdPathParameter.KEY + "/subtasks/metrics"; + } + + public static AggregatedSubtaskMetricsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java new file mode 100644 index 0000000..34e1b52 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Parameters for aggregating subtask metrics. + */ +public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetricsParameters<SubtasksFilterQueryParameter> { + + private final JobIDPathParameter jobId = new JobIDPathParameter(); + private final JobVertexIdPathParameter vertexId = new JobVertexIdPathParameter(); + private final SubtaskIndexPathParameter subtaskIndex = new SubtaskIndexPathParameter(); + + public AggregatedSubtaskMetricsParameters() { + super(new SubtasksFilterQueryParameter()); + } + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.unmodifiableCollection(Arrays.asList( + jobId, + vertexId, + subtaskIndex + )); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java new file mode 100644 index 0000000..5b5fe4c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedTaskManagerMetricsHeaders.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +/** + * Headers for aggregating task manager metrics. + */ +public class AggregatedTaskManagerMetricsHeaders extends AbstractAggregatedMetricsHeaders<AggregateTaskManagerMetricsParameters> { + + private static final AggregatedTaskManagerMetricsHeaders INSTANCE = new AggregatedTaskManagerMetricsHeaders(); + + private AggregatedTaskManagerMetricsHeaders() { + } + + @Override + public AggregateTaskManagerMetricsParameters getUnresolvedMessageParameters() { + return new AggregateTaskManagerMetricsParameters(); + } + + @Override + public String getTargetRestEndpointURL() { + return "/taskmanagers/metrics"; + } + + public static AggregatedTaskManagerMetricsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java new file mode 100644 index 0000000..fb57f87 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobsFilterQueryParameter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + + +/** + * {@link MessageQueryParameter} for selecting jobs when aggregating metrics. + */ +public class JobsFilterQueryParameter extends MessageQueryParameter<JobID> { + + JobsFilterQueryParameter() { + super("jobs", MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public JobID convertStringToValue(String value) throws ConversionException { + try { + return JobID.fromHexString(value); + } catch (IllegalArgumentException iae) { + throw new ConversionException("Not a valid job ID: " + value, iae); + } + } + + @Override + public String convertValueToString(JobID value) { + return value.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java new file mode 100644 index 0000000..1057788 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Locale; + +/** + * TODO: add javadoc. + */ +public class MetricsAggregationParameter extends MessageQueryParameter<MetricsAggregationParameter.AggregationMode> { + + protected MetricsAggregationParameter() { + super("agg", MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public AggregationMode convertStringToValue(String value) throws ConversionException { + try { + return AggregationMode.valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException iae) { + throw new ConversionException("Not a valid aggregation: " + value, iae); + } + } + + @Override + public String convertValueToString(AggregationMode value) { + return value.name().toLowerCase(); + } + + /** + * The available aggregations. + */ + public enum AggregationMode { + MIN, + MAX, + SUM, + AVG + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java new file mode 100644 index 0000000..fe5d37e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtasksFilterQueryParameter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +/** + * {@link MessageQueryParameter} for selecting subtasks when aggregating metrics. + */ +public class SubtasksFilterQueryParameter extends MessageQueryParameter<String> { + + SubtasksFilterQueryParameter() { + super("subtasks", MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public String convertStringToValue(String value) { + return value; + } + + @Override + public String convertValueToString(String value) { + return value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java new file mode 100644 index 0000000..dcd6934 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagersFilterQueryParameter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +/** + * {@link MessageQueryParameter} for selecting task managers when aggregating metrics. + */ +public class TaskManagersFilterQueryParameter extends MessageQueryParameter<ResourceID> { + + TaskManagersFilterQueryParameter() { + super("taskmanagers", MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public ResourceID convertStringToValue(String value) { + return new ResourceID(value); + } + + @Override + public String convertValueToString(ResourceID value) { + return value.getResourceIdString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 7e552de..fb663ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -56,6 +56,9 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatistic import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingJobsMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.AggregatingTaskManagersMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; @@ -393,6 +396,33 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp responseHeaders, metricFetcher); + final AggregatingTaskManagersMetricsHandler aggregatingTaskManagersMetricsHandler = new AggregatingTaskManagersMetricsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + executor, + metricFetcher + ); + + final AggregatingJobsMetricsHandler aggregatingJobsMetricsHandler = new AggregatingJobsMetricsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + executor, + metricFetcher + ); + + final AggregatingSubtasksMetricsHandler aggregatingSubtasksMetricsHandler = new AggregatingSubtasksMetricsHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + executor, + metricFetcher + ); + final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler( restAddressFuture, leaderRetriever, @@ -553,6 +583,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler)); handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler)); handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler)); + handlers.add(Tuple2.of(aggregatingTaskManagersMetricsHandler.getMessageHeaders(), aggregatingTaskManagersMetricsHandler)); + handlers.add(Tuple2.of(aggregatingJobsMetricsHandler.getMessageHeaders(), aggregatingJobsMetricsHandler)); + handlers.add(Tuple2.of(aggregatingSubtasksMetricsHandler.getMessageHeaders(), aggregatingSubtasksMetricsHandler)); handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler)); handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler)); handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler)); http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java new file mode 100644 index 0000000..2dac8bf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingJobsMetricsHandlerTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedJobMetricsParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Tests for the {@link AggregatingJobsMetricsHandler}. + */ +public class AggregatingJobsMetricsHandlerTest extends AggregatingMetricsHandlerTestBase<AggregatingJobsMetricsHandler, AggregatedJobMetricsParameters> { + + private static final JobID JOB_ID_1 = JobID.generate(); + private static final JobID JOB_ID_2 = JobID.generate(); + private static final JobID JOB_ID_3 = JobID.generate(); + + @Override + protected Tuple2<String, List<String>> getFilter() { + return Tuple2.of("jobs", Arrays.asList(JOB_ID_1.toString(), JOB_ID_3.toString())); + } + + @Override + protected Collection<MetricDump> getMetricDumps() { + Collection<MetricDump> dumps = new ArrayList<>(3); + QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_1.toString(), "abc"); + MetricDump.CounterDump cd1 = new MetricDump.CounterDump(job, "metric1", 1); + dumps.add(cd1); + + QueryScopeInfo.JobQueryScopeInfo job2 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_2.toString(), "abc"); + MetricDump.CounterDump cd2 = new MetricDump.CounterDump(job2, "metric1", 3); + dumps.add(cd2); + + QueryScopeInfo.JobQueryScopeInfo job3 = new QueryScopeInfo.JobQueryScopeInfo(JOB_ID_3.toString(), "abc"); + MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job3, "metric2", 5); + dumps.add(cd3); + return dumps; + } + + @Override + protected AggregatingJobsMetricsHandler getHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, Executor executor, MetricFetcher<?> fetcher) { + return new AggregatingJobsMetricsHandler( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + executor, + fetcher + ); + } +}