This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9cc3771577327d1f5eb7e2a44cd2b550aa602418 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Mar 23 16:08:48 2022 +0100 [FLINK-21585][metrics] Add options for in-/excluding metrics --- .../shortcodes/generated/metric_configuration.html | 12 ++ .../generated/metric_reporters_section.html | 12 ++ .../flink/configuration/ConfigurationUtils.java | 2 +- .../apache/flink/configuration/MetricOptions.java | 91 +++++++++ .../metrics/prometheus/PrometheusReporterTest.java | 4 +- .../flink/runtime/metrics/MetricRegistryImpl.java | 20 ++ .../flink/runtime/metrics/ReporterSetup.java | 37 +++- .../metrics/filter/DefaultMetricFilter.java | 129 +++++++++++++ .../flink/runtime/metrics/filter/MetricFilter.java | 37 ++++ .../metrics/groups/ReporterScopedSettings.java | 9 + .../runtime/metrics/MetricRegistryImplTest.java | 40 ++++ .../metrics/filter/DefaultMetricFilterTest.java | 215 +++++++++++++++++++++ .../metrics/groups/FrontMetricGroupTest.java | 14 +- .../runtime/metrics/groups/MetricGroupTest.java | 2 +- 14 files changed, 616 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/metric_configuration.html b/docs/layouts/shortcodes/generated/metric_configuration.html index 4fabd04..e8e3dd1 100644 --- a/docs/layouts/shortcodes/generated/metric_configuration.html +++ b/docs/layouts/shortcodes/generated/metric_configuration.html @@ -69,6 +69,18 @@ <td>The reporter factory class to use for the reporter named <name>.</td> </tr> <tr> + <td><h5>metrics.reporter.<name>.filter.excludes</h5></td> + <td style="word-wrap: break-word;"></td> + <td>List<String></td> + <td>The metrics that should be excluded for the reporter named <name>. The format is identical to <code class="highlighter-rouge">filter.includes</code><br /></td> + </tr> + <tr> + <td><h5>metrics.reporter.<name>.filter.includes</h5></td> + <td style="word-wrap: break-word;">"*:*:*"</td> + <td>List<String></td> + <td>The metrics that should be included for the reporter named <name>. Filters are specified as a list, with each filter following this format:<br /><code class="highlighter-rouge"><scope>[:<name>[,<name>][:<type>[,<type>]]]</code><br />A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.<br /><ul><li>scope: Filters based on the logical scope.<br />Specified as a pattern w [...] + </tr> + <tr> <td><h5>metrics.reporter.<name>.interval</h5></td> <td style="word-wrap: break-word;">10 s</td> <td>Duration</td> diff --git a/docs/layouts/shortcodes/generated/metric_reporters_section.html b/docs/layouts/shortcodes/generated/metric_reporters_section.html index bc4068c..1cc11c7 100644 --- a/docs/layouts/shortcodes/generated/metric_reporters_section.html +++ b/docs/layouts/shortcodes/generated/metric_reporters_section.html @@ -45,6 +45,18 @@ <td>The set of variables that should be excluded for the reporter named <name>. Only applicable to tag-based reporters (e.g., PRometheus, InfluxDB).</td> </tr> <tr> + <td><h5>metrics.reporter.<name>.filter.includes</h5></td> + <td style="word-wrap: break-word;">"*:*:*"</td> + <td>List<String></td> + <td>The metrics that should be included for the reporter named <name>. Filters are specified as a list, with each filter following this format:<br /><code class="highlighter-rouge"><scope>[:<name>[,<name>][:<type>[,<type>]]]</code><br />A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.<br /><ul><li>scope: Filters based on the logical scope.<br />Specified as a pattern w [...] + </tr> + <tr> + <td><h5>metrics.reporter.<name>.filter.excludes</h5></td> + <td style="word-wrap: break-word;"></td> + <td>List<String></td> + <td>The metrics that should be excluded for the reporter named <name>. The format is identical to <code class="highlighter-rouge">filter.includes</code><br /></td> + </tr> + <tr> <td><h5>metrics.reporter.<name>.<parameter></h5></td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index a2bce40..7975f34 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -371,7 +371,7 @@ public class ConfigurationUtils { } @SuppressWarnings("unchecked") - static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) { + public static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) { if (o.getClass().equals(clazz)) { return (E) o; } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 19e7bb9..12ab390 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Map; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.LineBreakElement.linebreak; +import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.configuration.description.TextElement.text; /** Configuration options for metrics and metric reporters. */ @@ -119,6 +121,95 @@ public class MetricOptions { @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX) @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 4) + public static final ConfigOption<List<String>> REPORTER_INCLUDES = + key("filter.includes") + .stringType() + .asList() + .defaultValues("*:*:*") + .withDescription( + Description.builder() + .text( + "The metrics that should be included for the reporter named <name>." + + " Filters are specified as a list, with each filter following this format:") + .linebreak() + .text("%s", code("<scope>[:<name>[,<name>][:<type>[,<type>]]]")) + .linebreak() + .text( + "A metric matches a filter if the scope pattern and at least one of the name patterns and at least one of the types match.") + .linebreak() + .list( + text( + "scope: Filters based on the logical scope.%s" + + "Specified as a pattern where %s matches any sequence of characters and %s separates scope components.%s%s" + + "For example:%s" + + " \"%s\" matches any job-related metrics on the JobManager,%s" + + " \"%s\" matches all job-related metrics and%s" + + " \"%s\" matches all metrics below the job-level (i.e., task/operator metrics etc.).%s%s", + linebreak(), + code("*"), + code("."), + linebreak(), + linebreak(), + linebreak(), + code("jobmanager.job"), + linebreak(), + code("*.job"), + linebreak(), + code("*.job.*"), + linebreak(), + linebreak()), + text( + "name: Filters based on the metric name.%s" + + "Specified as a comma-separate list of patterns where %s matches any sequence of characters.%s%s" + + "For example, \"%s\" matches any metrics where the name contains %s.%s%s", + linebreak(), + code("*"), + linebreak(), + linebreak(), + code("*Records*,*Bytes*"), + code("\"Records\" or \"Bytes\""), + linebreak(), + linebreak()), + text( + "type: Filters based on the metric type. Specified as a comma-separated list of metric types: %s", + code("[counter, meter, gauge, histogram]"))) + .text("Examples:") + .list( + text( + "\"%s\" Matches metrics like %s.", + code("*:numRecords*"), code("numRecordsIn")), + text( + "\"%s\" Matches metrics like %s on the operator level.", + code("*.job.task.operator:numRecords*"), + code("numRecordsIn")), + text( + "\"%s\" Matches meter metrics like %s on the operator level.", + code("*.job.task.operator:numRecords*:meter"), + code("numRecordsInPerSecond")), + text( + "\"%s\" Matches all counter/meter metrics like or %s.", + code("*:numRecords*,numBytes*:counter,meter"), + code("numRecordsInPerSecond"), + code("numBytesOut"))) + .build()); + + @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX) + @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 5) + public static final ConfigOption<List<String>> REPORTER_EXCLUDES = + key("filter.excludes") + .stringType() + .asList() + .defaultValues() + .withDescription( + Description.builder() + .text( + "The metrics that should be excluded for the reporter named <name>. The format is identical to %s", + code(REPORTER_INCLUDES.key())) + .linebreak() + .build()); + + @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX) + @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, position = 6) public static final ConfigOption<String> REPORTER_CONFIG_PARAMETER = key("<parameter>") .stringType() diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 45d2b19..d45c14d 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryTestUtils; import org.apache.flink.runtime.metrics.ReporterSetup; +import org.apache.flink.runtime.metrics.filter.MetricFilter; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; @@ -340,6 +341,7 @@ class PrometheusReporterTest { } private static ReporterScopedSettings createReporterScopedSettings() { - return new ReporterScopedSettings(0, ',', Collections.emptySet(), Collections.emptyMap()); + return new ReporterScopedSettings( + 0, ',', MetricFilter.NO_OP_FILTER, Collections.emptySet(), Collections.emptyMap()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index e8108b3..8d26119 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.View; @@ -172,6 +173,7 @@ public class MetricRegistryImpl implements MetricRegistry { new ReporterScopedSettings( reporters.size(), delimiterForReporter.charAt(0), + reporterSetup.getFilter(), reporterSetup.getExcludedVariables(), reporterSetup.getAdditionalVariables()))); } catch (Throwable t) { @@ -380,6 +382,12 @@ public class MetricRegistryImpl implements MetricRegistry { LOG.warn( "Cannot register metric, because the MetricRegistry has already been shut down."); } else { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Registering metric {}.{}.", + group.getLogicalScope(CharacterFilter.NO_OP_FILTER), + metricName); + } if (reporters != null) { forAllReporters(MetricReporter::notifyOfAddedMetric, metric, metricName, group); } @@ -445,6 +453,18 @@ public class MetricRegistryImpl implements MetricRegistry { try { ReporterAndSettings reporterAndSettings = reporters.get(i); if (reporterAndSettings != null) { + final String logicalScope = group.getLogicalScope(CharacterFilter.NO_OP_FILTER); + if (!reporterAndSettings + .settings + .getFilter() + .filter(metric, metricName, logicalScope)) { + LOG.trace( + "Ignoring metric {}.{} for reporter #{} due to filter rules.", + logicalScope, + metricName, + i); + continue; + } FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>( reporterAndSettings.getSettings(), group); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java index c1e48df..7d6c6a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java @@ -29,6 +29,8 @@ import org.apache.flink.metrics.reporter.InstantiateViaFactory; import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter; +import org.apache.flink.runtime.metrics.filter.MetricFilter; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; @@ -85,16 +87,19 @@ public final class ReporterSetup { private final String name; private final MetricConfig configuration; private final MetricReporter reporter; + private final MetricFilter filter; private final Map<String, String> additionalVariables; public ReporterSetup( final String name, final MetricConfig configuration, MetricReporter reporter, + final MetricFilter filter, final Map<String, String> additionalVariables) { this.name = name; this.configuration = configuration; this.reporter = reporter; + this.filter = filter; this.additionalVariables = additionalVariables; } @@ -122,6 +127,10 @@ public final class ReporterSetup { } } + public MetricFilter getFilter() { + return filter; + } + public Map<String, String> getAdditionalVariables() { return additionalVariables; } @@ -142,23 +151,41 @@ public final class ReporterSetup { @VisibleForTesting public static ReporterSetup forReporter(String reporterName, MetricReporter reporter) { return createReporterSetup( - reporterName, new MetricConfig(), reporter, Collections.emptyMap()); + reporterName, + new MetricConfig(), + reporter, + MetricFilter.NO_OP_FILTER, + Collections.emptyMap()); } @VisibleForTesting public static ReporterSetup forReporter( String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - return createReporterSetup(reporterName, metricConfig, reporter, Collections.emptyMap()); + return createReporterSetup( + reporterName, + metricConfig, + reporter, + MetricFilter.NO_OP_FILTER, + Collections.emptyMap()); + } + + @VisibleForTesting + public static ReporterSetup forReporter( + String reporterName, MetricFilter metricFilter, MetricReporter reporter) { + return createReporterSetup( + reporterName, new MetricConfig(), reporter, metricFilter, Collections.emptyMap()); } private static ReporterSetup createReporterSetup( String reporterName, MetricConfig metricConfig, MetricReporter reporter, + MetricFilter metricFilter, Map<String, String> additionalVariables) { reporter.open(metricConfig); - return new ReporterSetup(reporterName, metricConfig, reporter, additionalVariables); + return new ReporterSetup( + reporterName, metricConfig, reporter, metricFilter, additionalVariables); } public static List<ReporterSetup> fromConfiguration( @@ -299,6 +326,9 @@ public final class ReporterSetup { Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories); + final MetricFilter metricFilter = + DefaultMetricFilter.fromConfiguration(reporterConfig); + // massage user variables keys into scope format for parity to variable exclusion Map<String, String> additionalVariables = reporterConfig.get(MetricOptions.REPORTER_ADDITIONAL_VARIABLES).entrySet() @@ -317,6 +347,7 @@ public final class ReporterSetup { reporterName, metricConfig, reporter, + metricFilter, additionalVariables)); }); } catch (Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java new file mode 100644 index 0000000..b31a30e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.filter; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricType; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Default {@link MetricFilter} implementation that filters metrics based on {@link + * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}. + */ +public class DefaultMetricFilter implements MetricFilter { + + private static final EnumSet<MetricType> ALL_METRIC_TYPES = EnumSet.allOf(MetricType.class); + @VisibleForTesting static final String LIST_DELIMITER = ","; + + private final List<FilterSpec> includes; + private final List<FilterSpec> excludes; + + private DefaultMetricFilter(List<FilterSpec> includes, List<FilterSpec> excludes) { + this.includes = includes; + this.excludes = excludes; + } + + @Override + public boolean filter(Metric metric, String name, String logicalScope) { + for (FilterSpec exclude : excludes) { + if (exclude.namePattern.matcher(name).matches() + && exclude.scopePattern.matcher(logicalScope).matches() + && exclude.types.contains(metric.getMetricType())) { + return false; + } + } + for (FilterSpec include : includes) { + if (include.namePattern.matcher(name).matches() + && include.scopePattern.matcher(logicalScope).matches() + && include.types.contains(metric.getMetricType())) { + return true; + } + } + return false; + } + + public static MetricFilter fromConfiguration(Configuration configuration) { + final List<String> includes = configuration.get(MetricOptions.REPORTER_INCLUDES); + final List<String> excludes = configuration.get(MetricOptions.REPORTER_EXCLUDES); + + final List<FilterSpec> includeFilters = + includes.stream().map(i -> parse(i)).collect(Collectors.toList()); + final List<FilterSpec> excludeFilters = + excludes.stream().map(e -> parse(e)).collect(Collectors.toList()); + + return new DefaultMetricFilter(includeFilters, excludeFilters); + } + + private static FilterSpec parse(String filter) { + final String[] split = filter.split(":"); + final Pattern scope = convertToPattern(split[0]); + final Pattern name = split.length > 1 ? convertToPattern(split[1]) : convertToPattern("*"); + final EnumSet<MetricType> type = + split.length > 2 ? parseMetricTypes(split[2]) : ALL_METRIC_TYPES; + + return new FilterSpec(scope, name, type); + } + + @VisibleForTesting + static Pattern convertToPattern(String scopeOrNameComponent) { + final String[] split = scopeOrNameComponent.split(LIST_DELIMITER); + + final String rawPattern = + Arrays.stream(split) + .map(s -> s.replaceAll("\\.", "\\.")) + .map(s -> s.replaceAll("\\*", ".*")) + .collect(Collectors.joining("|", "(", ")")); + + return Pattern.compile(rawPattern); + } + + @VisibleForTesting + static EnumSet<MetricType> parseMetricTypes(String typeComponent) { + final String[] split = typeComponent.split(LIST_DELIMITER); + + if (split.length == 1 && split[0].equals("*")) { + return ALL_METRIC_TYPES; + } + + return EnumSet.copyOf( + Arrays.stream(split) + .map(s -> ConfigurationUtils.convertToEnum(s, MetricType.class)) + .collect(Collectors.toSet())); + } + + private static class FilterSpec { + private final Pattern scopePattern; + private final Pattern namePattern; + private final EnumSet<MetricType> types; + + private FilterSpec(Pattern scopePattern, Pattern namePattern, EnumSet<MetricType> types) { + this.scopePattern = scopePattern; + this.namePattern = namePattern; + this.types = types; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java new file mode 100644 index 0000000..47a4781 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.filter; + +import org.apache.flink.metrics.Metric; + +/** A filter for metrics. */ +public interface MetricFilter { + + /** Filter that accepts every metric. */ + MetricFilter NO_OP_FILTER = (metric, name, scope) -> true; + + /** + * Filters a given metric. + * + * @param metric the metric to filter + * @param name the name of the metric + * @param logicalScope the logical scope of the metric + * @return true, if the metric matches, false otherwise + */ + boolean filter(Metric metric, String name, String logicalScope); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java index 5fbea42..1a9c81c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.runtime.metrics.filter.MetricFilter; import org.apache.flink.util.Preconditions; import java.util.Map; @@ -31,17 +32,21 @@ public class ReporterScopedSettings { private final Set<String> excludedVariables; + private final MetricFilter filter; + private final Map<String, String> additionalVariables; public ReporterScopedSettings( int reporterIndex, char delimiter, + MetricFilter filter, Set<String> excludedVariables, Map<String, String> additionalVariables) { this.excludedVariables = excludedVariables; Preconditions.checkArgument(reporterIndex >= 0); this.reporterIndex = reporterIndex; this.delimiter = delimiter; + this.filter = filter; this.additionalVariables = additionalVariables; } @@ -53,6 +58,10 @@ public class ReporterScopedSettings { return delimiter; } + public MetricFilter getFilter() { + return filter; + } + public Set<String> getExcludedVariables() { return excludedVariables; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index 4105a69..8748c39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -27,11 +27,14 @@ import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.metrics.util.TestCounter; +import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.metrics.CollectingMetricsReporter.MetricGroupAndName; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter; import org.apache.flink.runtime.metrics.groups.MetricGroupTest; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -462,4 +465,41 @@ class MetricRegistryImplTest { throw new RuntimeException(); } } + + @Test + void testMetricFiltering() { + final String excludedMetricName = "excluded"; + final NotificationCapturingReporter reporter = new NotificationCapturingReporter(); + + final Configuration reporterConfig = new Configuration(); + reporterConfig.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:counter")); + reporterConfig.set( + MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:" + excludedMetricName)); + + MetricRegistryImpl registry = + new MetricRegistryImpl( + MetricRegistryTestUtils.defaultMetricRegistryConfiguration(), + Arrays.asList( + ReporterSetup.forReporter( + "test", + DefaultMetricFilter.fromConfiguration(reporterConfig), + reporter))); + + registry.register( + new TestMeter(), "", new MetricGroupTest.DummyAbstractMetricGroup(registry)); + + assertThat(reporter.getLastAddedMetric()).isEmpty(); + + registry.register( + new TestCounter(), + excludedMetricName, + new MetricGroupTest.DummyAbstractMetricGroup(registry)); + + assertThat(reporter.getLastAddedMetric()).isEmpty(); + + registry.register( + new TestCounter(), "foo", new MetricGroupTest.DummyAbstractMetricGroup(registry)); + + assertThat(reporter.getLastAddedMetric()).isNotEmpty(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java new file mode 100644 index 0000000..1d32521 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.filter; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricType; +import org.apache.flink.metrics.util.TestCounter; +import org.apache.flink.metrics.util.TestMeter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; + +@Execution(ExecutionMode.CONCURRENT) +class DefaultMetricFilterTest { + + private static final Counter COUNTER = new TestCounter(); + private static final Meter METER = new TestMeter(); + private static final Gauge<Integer> GAUGE = () -> 4; + + @Test + void testConvertToPatternWithoutWildcards() { + final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecordsIn"); + assertThat(pattern.toString()).isEqualTo("(numRecordsIn)"); + assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true); + assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false); + } + + @Test + void testConvertToPatternSingle() { + final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*"); + assertThat(pattern.toString()).isEqualTo("(numRecords.*)"); + assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true); + assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false); + } + + @Test + void testConvertToPatternMultiple() { + final Pattern pattern = DefaultMetricFilter.convertToPattern("numRecords*,numBytes*"); + assertThat(pattern.toString()).isEqualTo("(numRecords.*|numBytes.*)"); + assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true); + assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(true); + assertThat(pattern.matcher("numBytes").matches()).isEqualTo(true); + assertThat(pattern.matcher("hello").matches()).isEqualTo(false); + } + + @Test + void testParseMetricTypesSingle() { + final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter"); + assertThat(types).containsExactly(MetricType.METER); + } + + @Test + void testParseMetricTypesMultiple() { + final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter,counter"); + assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER); + } + + @Test + void testParseMetricTypesCaseIgnored() { + final EnumSet<MetricType> types = DefaultMetricFilter.parseMetricTypes("meter,CoUnTeR"); + assertThat(types).containsExactlyInAnyOrder(MetricType.METER, MetricType.COUNTER); + } + + @Test + void testFromConfigurationIncludeByScope() { + Configuration configuration = new Configuration(); + configuration.set( + MetricOptions.REPORTER_INCLUDES, Arrays.asList("include1:*:*", "include2.*:*:*")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Collections.emptyList()); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(false); + assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(false); + assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(true); + } + + @Test + void testFromConfigurationIncludeByName() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:name:*")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Collections.emptyList()); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false); + } + + @Test + void testFromConfigurationIncludeByType() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:counter")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Collections.emptyList()); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false); + } + + @Test + void testFromConfigurationExcludeByScope() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:*")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("include1", "include2.*")); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "name", "include1")).isEqualTo(false); + assertThat(metricFilter.filter(COUNTER, "name", "include1.bar")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "name", "include2")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "name", "include2.bar")).isEqualTo(false); + } + + @Test + void testFromConfigurationExcludeByName() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:*")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:faa*", "*:foo")); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "name", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(false); + assertThat(metricFilter.filter(COUNTER, "foob", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(COUNTER, "faab", "bar")).isEqualTo(false); + } + + @Test + void testFromConfigurationExcludeByType() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:*")); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:meter")); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false); + } + + @Test + void testFromConfigurationIncludeDefault() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:meter")); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "hello")).isEqualTo(true); + assertThat(metricFilter.filter(METER, "foo", "hello")).isEqualTo(false); + } + + @Test + void testFromConfigurationExcludeDefault() { + Configuration configuration = new Configuration(); + configuration.set(MetricOptions.REPORTER_INCLUDES, Arrays.asList("*:*:*")); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true); + } + + @Test + void testFromConfigurationAllDefault() { + Configuration configuration = new Configuration(); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(true); + } + + @Test + void testFromConfigurationMultiplePatterns() { + Configuration configuration = new Configuration(); + + configuration.set(MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:*:*")); + configuration.setString( + MetricOptions.REPORTER_EXCLUDES.key(), "*:foo,bar:meter;*:foo,bar:gauge"); + + final MetricFilter metricFilter = DefaultMetricFilter.fromConfiguration(configuration); + + assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true); + assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false); + assertThat(metricFilter.filter(GAUGE, "foo", "bar")).isEqualTo(false); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java index a1208fb..2cd655e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.metrics.filter.MetricFilter; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; @@ -48,7 +49,11 @@ public class FrontMetricGroupTest { final FrontMetricGroup<?> frontMetricGroup = new FrontMetricGroup<>( new ReporterScopedSettings( - 0, delimiter, Collections.emptySet(), Collections.emptyMap()), + 0, + delimiter, + MetricFilter.NO_OP_FILTER, + Collections.emptySet(), + Collections.emptyMap()), new ProcessMetricGroup( TestingMetricRegistry.builder() .setScopeFormats(ScopeFormats.fromConfig(config)) @@ -79,7 +84,11 @@ public class FrontMetricGroupTest { final FrontMetricGroup<?> frontMetricGroup = new FrontMetricGroup<>( new ReporterScopedSettings( - 0, delimiter, Collections.emptySet(), Collections.emptyMap()), + 0, + delimiter, + MetricFilter.NO_OP_FILTER, + Collections.emptySet(), + Collections.emptyMap()), new ProcessMetricGroup( TestingMetricRegistry.builder() .setScopeFormats(ScopeFormats.fromConfig(config)) @@ -107,6 +116,7 @@ public class FrontMetricGroupTest { new ReporterScopedSettings( 0, '.', + MetricFilter.NO_OP_FILTER, Collections.emptySet(), ImmutableMap.of(ScopeFormat.asVariable("foo"), "bar")), new ProcessMetricGroup(TestingMetricRegistry.builder().build(), "host")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 754766d..f205559 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -402,7 +402,7 @@ public class MetricGroupTest extends TestLogger { @Override protected String getGroupName(CharacterFilter filter) { - return ""; + return "foo"; } @Override
