This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cc3771577327d1f5eb7e2a44cd2b550aa602418
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Mar 23 16:08:48 2022 +0100

    [FLINK-21585][metrics] Add options for in-/excluding metrics
---
 .../shortcodes/generated/metric_configuration.html |  12 ++
 .../generated/metric_reporters_section.html        |  12 ++
 .../flink/configuration/ConfigurationUtils.java    |   2 +-
 .../apache/flink/configuration/MetricOptions.java  |  91 +++++++++
 .../metrics/prometheus/PrometheusReporterTest.java |   4 +-
 .../flink/runtime/metrics/MetricRegistryImpl.java  |  20 ++
 .../flink/runtime/metrics/ReporterSetup.java       |  37 +++-
 .../metrics/filter/DefaultMetricFilter.java        | 129 +++++++++++++
 .../flink/runtime/metrics/filter/MetricFilter.java |  37 ++++
 .../metrics/groups/ReporterScopedSettings.java     |   9 +
 .../runtime/metrics/MetricRegistryImplTest.java    |  40 ++++
 .../metrics/filter/DefaultMetricFilterTest.java    | 215 +++++++++++++++++++++
 .../metrics/groups/FrontMetricGroupTest.java       |  14 +-
 .../runtime/metrics/groups/MetricGroupTest.java    |   2 +-
 14 files changed, 616 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/metric_configuration.html 
b/docs/layouts/shortcodes/generated/metric_configuration.html
index 4fabd04..e8e3dd1 100644
--- a/docs/layouts/shortcodes/generated/metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/metric_configuration.html
@@ -69,6 +69,18 @@
             <td>The reporter factory class to use for the reporter named 
&lt;name&gt;.</td>
         </tr>
         <tr>
+            <td><h5>metrics.reporter.&lt;name&gt;.filter.excludes</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>The metrics that should be excluded for the reporter named 
&lt;name&gt;. The format is identical to <code 
class="highlighter-rouge">filter.includes</code><br /></td>
+        </tr>
+        <tr>
+            <td><h5>metrics.reporter.&lt;name&gt;.filter.includes</h5></td>
+            <td style="word-wrap: break-word;">"*:*:*"</td>
+            <td>List&lt;String&gt;</td>
+            <td>The metrics that should be included for the reporter named 
&lt;name&gt;. Filters are specified as a list, with each filter following this 
format:<br /><code 
class="highlighter-rouge">&lt;scope&gt;[:&lt;name&gt;[,&lt;name&gt;][:&lt;type&gt;[,&lt;type&gt;]]]</code><br
 />A metric matches a filter if the scope pattern and at least one of the name 
patterns and at least one of the types match.<br /><ul><li>scope: Filters based 
on the logical scope.<br />Specified as a pattern w [...]
+        </tr>
+        <tr>
             <td><h5>metrics.reporter.&lt;name&gt;.interval</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
             <td>Duration</td>
diff --git a/docs/layouts/shortcodes/generated/metric_reporters_section.html 
b/docs/layouts/shortcodes/generated/metric_reporters_section.html
index bc4068c..1cc11c7 100644
--- a/docs/layouts/shortcodes/generated/metric_reporters_section.html
+++ b/docs/layouts/shortcodes/generated/metric_reporters_section.html
@@ -45,6 +45,18 @@
             <td>The set of variables that should be excluded for the reporter 
named &lt;name&gt;. Only applicable to tag-based reporters (e.g., PRometheus, 
InfluxDB).</td>
         </tr>
         <tr>
+            <td><h5>metrics.reporter.&lt;name&gt;.filter.includes</h5></td>
+            <td style="word-wrap: break-word;">"*:*:*"</td>
+            <td>List&lt;String&gt;</td>
+            <td>The metrics that should be included for the reporter named 
&lt;name&gt;. Filters are specified as a list, with each filter following this 
format:<br /><code 
class="highlighter-rouge">&lt;scope&gt;[:&lt;name&gt;[,&lt;name&gt;][:&lt;type&gt;[,&lt;type&gt;]]]</code><br
 />A metric matches a filter if the scope pattern and at least one of the name 
patterns and at least one of the types match.<br /><ul><li>scope: Filters based 
on the logical scope.<br />Specified as a pattern w [...]
+        </tr>
+        <tr>
+            <td><h5>metrics.reporter.&lt;name&gt;.filter.excludes</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>The metrics that should be excluded for the reporter named 
&lt;name&gt;. The format is identical to <code 
class="highlighter-rouge">filter.includes</code><br /></td>
+        </tr>
+        <tr>
             <td><h5>metrics.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index a2bce40..7975f34 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -371,7 +371,7 @@ public class ConfigurationUtils {
     }
 
     @SuppressWarnings("unchecked")
-    static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
+    public static <E extends Enum<?>> E convertToEnum(Object o, Class<E> 
clazz) {
         if (o.getClass().equals(clazz)) {
             return (E) o;
         }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 19e7bb9..12ab390 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static 
org.apache.flink.configuration.description.LineBreakElement.linebreak;
+import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /** Configuration options for metrics and metric reporters. */
@@ -119,6 +121,95 @@ public class MetricOptions {
 
     @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
     @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, 
position = 4)
+    public static final ConfigOption<List<String>> REPORTER_INCLUDES =
+            key("filter.includes")
+                    .stringType()
+                    .asList()
+                    .defaultValues("*:*:*")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The metrics that should be 
included for the reporter named <name>."
+                                                    + " Filters are specified 
as a list, with each filter following this format:")
+                                    .linebreak()
+                                    .text("%s", 
code("<scope>[:<name>[,<name>][:<type>[,<type>]]]"))
+                                    .linebreak()
+                                    .text(
+                                            "A metric matches a filter if the 
scope pattern and at least one of the name patterns and at least one of the 
types match.")
+                                    .linebreak()
+                                    .list(
+                                            text(
+                                                    "scope: Filters based on 
the logical scope.%s"
+                                                            + "Specified as a 
pattern where %s matches any sequence of characters and %s separates scope 
components.%s%s"
+                                                            + "For example:%s"
+                                                            + " \"%s\" matches 
any job-related metrics on the JobManager,%s"
+                                                            + " \"%s\" matches 
all job-related metrics and%s"
+                                                            + " \"%s\" matches 
all metrics below the job-level (i.e., task/operator metrics etc.).%s%s",
+                                                    linebreak(),
+                                                    code("*"),
+                                                    code("."),
+                                                    linebreak(),
+                                                    linebreak(),
+                                                    linebreak(),
+                                                    code("jobmanager.job"),
+                                                    linebreak(),
+                                                    code("*.job"),
+                                                    linebreak(),
+                                                    code("*.job.*"),
+                                                    linebreak(),
+                                                    linebreak()),
+                                            text(
+                                                    "name: Filters based on 
the metric name.%s"
+                                                            + "Specified as a 
comma-separate list of patterns where %s matches any sequence of 
characters.%s%s"
+                                                            + "For example, 
\"%s\" matches any metrics where the name contains %s.%s%s",
+                                                    linebreak(),
+                                                    code("*"),
+                                                    linebreak(),
+                                                    linebreak(),
+                                                    code("*Records*,*Bytes*"),
+                                                    code("\"Records\" or 
\"Bytes\""),
+                                                    linebreak(),
+                                                    linebreak()),
+                                            text(
+                                                    "type: Filters based on 
the metric type. Specified as a comma-separated list of metric types: %s",
+                                                    code("[counter, meter, 
gauge, histogram]")))
+                                    .text("Examples:")
+                                    .list(
+                                            text(
+                                                    "\"%s\" Matches metrics 
like %s.",
+                                                    code("*:numRecords*"), 
code("numRecordsIn")),
+                                            text(
+                                                    "\"%s\" Matches metrics 
like %s on the operator level.",
+                                                    
code("*.job.task.operator:numRecords*"),
+                                                    code("numRecordsIn")),
+                                            text(
+                                                    "\"%s\" Matches meter 
metrics like %s on the operator level.",
+                                                    
code("*.job.task.operator:numRecords*:meter"),
+                                                    
code("numRecordsInPerSecond")),
+                                            text(
+                                                    "\"%s\" Matches all 
counter/meter metrics like or %s.",
+                                                    
code("*:numRecords*,numBytes*:counter,meter"),
+                                                    
code("numRecordsInPerSecond"),
+                                                    code("numBytesOut")))
+                                    .build());
+
+    @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
+    @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, 
position = 5)
+    public static final ConfigOption<List<String>> REPORTER_EXCLUDES =
+            key("filter.excludes")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The metrics that should be 
excluded for the reporter named <name>. The format is identical to %s",
+                                            code(REPORTER_INCLUDES.key()))
+                                    .linebreak()
+                                    .build());
+
+    @Documentation.SuffixOption(NAMED_REPORTER_CONFIG_PREFIX)
+    @Documentation.Section(value = Documentation.Sections.METRIC_REPORTERS, 
position = 6)
     public static final ConfigOption<String> REPORTER_CONFIG_PARAMETER =
             key("<parameter>")
                     .stringType()
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 45d2b19..d45c14d 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
 import org.apache.flink.runtime.metrics.ReporterSetup;
+import org.apache.flink.runtime.metrics.filter.MetricFilter;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -340,6 +341,7 @@ class PrometheusReporterTest {
     }
 
     private static ReporterScopedSettings createReporterScopedSettings() {
-        return new ReporterScopedSettings(0, ',', Collections.emptySet(), 
Collections.emptyMap());
+        return new ReporterScopedSettings(
+                0, ',', MetricFilter.NO_OP_FILTER, Collections.emptySet(), 
Collections.emptyMap());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index e8108b3..8d26119 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.View;
@@ -172,6 +173,7 @@ public class MetricRegistryImpl implements MetricRegistry {
                                     new ReporterScopedSettings(
                                             reporters.size(),
                                             delimiterForReporter.charAt(0),
+                                            reporterSetup.getFilter(),
                                             
reporterSetup.getExcludedVariables(),
                                             
reporterSetup.getAdditionalVariables())));
                 } catch (Throwable t) {
@@ -380,6 +382,12 @@ public class MetricRegistryImpl implements MetricRegistry {
                 LOG.warn(
                         "Cannot register metric, because the MetricRegistry 
has already been shut down.");
             } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Registering metric {}.{}.",
+                            
group.getLogicalScope(CharacterFilter.NO_OP_FILTER),
+                            metricName);
+                }
                 if (reporters != null) {
                     forAllReporters(MetricReporter::notifyOfAddedMetric, 
metric, metricName, group);
                 }
@@ -445,6 +453,18 @@ public class MetricRegistryImpl implements MetricRegistry {
             try {
                 ReporterAndSettings reporterAndSettings = reporters.get(i);
                 if (reporterAndSettings != null) {
+                    final String logicalScope = 
group.getLogicalScope(CharacterFilter.NO_OP_FILTER);
+                    if (!reporterAndSettings
+                            .settings
+                            .getFilter()
+                            .filter(metric, metricName, logicalScope)) {
+                        LOG.trace(
+                                "Ignoring metric {}.{} for reporter #{} due to 
filter rules.",
+                                logicalScope,
+                                metricName,
+                                i);
+                        continue;
+                    }
                     FrontMetricGroup front =
                             new FrontMetricGroup<AbstractMetricGroup<?>>(
                                     reporterAndSettings.getSettings(), group);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index c1e48df..7d6c6a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter;
+import org.apache.flink.runtime.metrics.filter.MetricFilter;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
@@ -85,16 +87,19 @@ public final class ReporterSetup {
     private final String name;
     private final MetricConfig configuration;
     private final MetricReporter reporter;
+    private final MetricFilter filter;
     private final Map<String, String> additionalVariables;
 
     public ReporterSetup(
             final String name,
             final MetricConfig configuration,
             MetricReporter reporter,
+            final MetricFilter filter,
             final Map<String, String> additionalVariables) {
         this.name = name;
         this.configuration = configuration;
         this.reporter = reporter;
+        this.filter = filter;
         this.additionalVariables = additionalVariables;
     }
 
@@ -122,6 +127,10 @@ public final class ReporterSetup {
         }
     }
 
+    public MetricFilter getFilter() {
+        return filter;
+    }
+
     public Map<String, String> getAdditionalVariables() {
         return additionalVariables;
     }
@@ -142,23 +151,41 @@ public final class ReporterSetup {
     @VisibleForTesting
     public static ReporterSetup forReporter(String reporterName, 
MetricReporter reporter) {
         return createReporterSetup(
-                reporterName, new MetricConfig(), reporter, 
Collections.emptyMap());
+                reporterName,
+                new MetricConfig(),
+                reporter,
+                MetricFilter.NO_OP_FILTER,
+                Collections.emptyMap());
     }
 
     @VisibleForTesting
     public static ReporterSetup forReporter(
             String reporterName, MetricConfig metricConfig, MetricReporter 
reporter) {
-        return createReporterSetup(reporterName, metricConfig, reporter, 
Collections.emptyMap());
+        return createReporterSetup(
+                reporterName,
+                metricConfig,
+                reporter,
+                MetricFilter.NO_OP_FILTER,
+                Collections.emptyMap());
+    }
+
+    @VisibleForTesting
+    public static ReporterSetup forReporter(
+            String reporterName, MetricFilter metricFilter, MetricReporter 
reporter) {
+        return createReporterSetup(
+                reporterName, new MetricConfig(), reporter, metricFilter, 
Collections.emptyMap());
     }
 
     private static ReporterSetup createReporterSetup(
             String reporterName,
             MetricConfig metricConfig,
             MetricReporter reporter,
+            MetricFilter metricFilter,
             Map<String, String> additionalVariables) {
         reporter.open(metricConfig);
 
-        return new ReporterSetup(reporterName, metricConfig, reporter, 
additionalVariables);
+        return new ReporterSetup(
+                reporterName, metricConfig, reporter, metricFilter, 
additionalVariables);
     }
 
     public static List<ReporterSetup> fromConfiguration(
@@ -299,6 +326,9 @@ public final class ReporterSetup {
                 Optional<MetricReporter> metricReporterOptional =
                         loadReporter(reporterName, reporterConfig, 
reporterFactories);
 
+                final MetricFilter metricFilter =
+                        DefaultMetricFilter.fromConfiguration(reporterConfig);
+
                 // massage user variables keys into scope format for parity to 
variable exclusion
                 Map<String, String> additionalVariables =
                         
reporterConfig.get(MetricOptions.REPORTER_ADDITIONAL_VARIABLES).entrySet()
@@ -317,6 +347,7 @@ public final class ReporterSetup {
                                             reporterName,
                                             metricConfig,
                                             reporter,
+                                            metricFilter,
                                             additionalVariables));
                         });
             } catch (Throwable t) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
new file mode 100644
index 0000000..b31a30e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link MetricFilter} implementation that filters metrics based on 
{@link
+ * MetricOptions#REPORTER_INCLUDES}/{@link MetricOptions#REPORTER_EXCLUDES}.
+ */
+public class DefaultMetricFilter implements MetricFilter {
+
+    private static final EnumSet<MetricType> ALL_METRIC_TYPES = 
EnumSet.allOf(MetricType.class);
+    @VisibleForTesting static final String LIST_DELIMITER = ",";
+
+    private final List<FilterSpec> includes;
+    private final List<FilterSpec> excludes;
+
+    private DefaultMetricFilter(List<FilterSpec> includes, List<FilterSpec> 
excludes) {
+        this.includes = includes;
+        this.excludes = excludes;
+    }
+
+    @Override
+    public boolean filter(Metric metric, String name, String logicalScope) {
+        for (FilterSpec exclude : excludes) {
+            if (exclude.namePattern.matcher(name).matches()
+                    && exclude.scopePattern.matcher(logicalScope).matches()
+                    && exclude.types.contains(metric.getMetricType())) {
+                return false;
+            }
+        }
+        for (FilterSpec include : includes) {
+            if (include.namePattern.matcher(name).matches()
+                    && include.scopePattern.matcher(logicalScope).matches()
+                    && include.types.contains(metric.getMetricType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static MetricFilter fromConfiguration(Configuration configuration) {
+        final List<String> includes = 
configuration.get(MetricOptions.REPORTER_INCLUDES);
+        final List<String> excludes = 
configuration.get(MetricOptions.REPORTER_EXCLUDES);
+
+        final List<FilterSpec> includeFilters =
+                includes.stream().map(i -> 
parse(i)).collect(Collectors.toList());
+        final List<FilterSpec> excludeFilters =
+                excludes.stream().map(e -> 
parse(e)).collect(Collectors.toList());
+
+        return new DefaultMetricFilter(includeFilters, excludeFilters);
+    }
+
+    private static FilterSpec parse(String filter) {
+        final String[] split = filter.split(":");
+        final Pattern scope = convertToPattern(split[0]);
+        final Pattern name = split.length > 1 ? convertToPattern(split[1]) : 
convertToPattern("*");
+        final EnumSet<MetricType> type =
+                split.length > 2 ? parseMetricTypes(split[2]) : 
ALL_METRIC_TYPES;
+
+        return new FilterSpec(scope, name, type);
+    }
+
+    @VisibleForTesting
+    static Pattern convertToPattern(String scopeOrNameComponent) {
+        final String[] split = scopeOrNameComponent.split(LIST_DELIMITER);
+
+        final String rawPattern =
+                Arrays.stream(split)
+                        .map(s -> s.replaceAll("\\.", "\\."))
+                        .map(s -> s.replaceAll("\\*", ".*"))
+                        .collect(Collectors.joining("|", "(", ")"));
+
+        return Pattern.compile(rawPattern);
+    }
+
+    @VisibleForTesting
+    static EnumSet<MetricType> parseMetricTypes(String typeComponent) {
+        final String[] split = typeComponent.split(LIST_DELIMITER);
+
+        if (split.length == 1 && split[0].equals("*")) {
+            return ALL_METRIC_TYPES;
+        }
+
+        return EnumSet.copyOf(
+                Arrays.stream(split)
+                        .map(s -> ConfigurationUtils.convertToEnum(s, 
MetricType.class))
+                        .collect(Collectors.toSet()));
+    }
+
+    private static class FilterSpec {
+        private final Pattern scopePattern;
+        private final Pattern namePattern;
+        private final EnumSet<MetricType> types;
+
+        private FilterSpec(Pattern scopePattern, Pattern namePattern, 
EnumSet<MetricType> types) {
+            this.scopePattern = scopePattern;
+            this.namePattern = namePattern;
+            this.types = types;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java
new file mode 100644
index 0000000..47a4781
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/filter/MetricFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.metrics.Metric;
+
+/** A filter for metrics. */
+public interface MetricFilter {
+
+    /** Filter that accepts every metric. */
+    MetricFilter NO_OP_FILTER = (metric, name, scope) -> true;
+
+    /**
+     * Filters a given metric.
+     *
+     * @param metric the metric to filter
+     * @param name the name of the metric
+     * @param logicalScope the logical scope of the metric
+     * @return true, if the metric matches, false otherwise
+     */
+    boolean filter(Metric metric, String name, String logicalScope);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java
index 5fbea42..1a9c81c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.runtime.metrics.filter.MetricFilter;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
@@ -31,17 +32,21 @@ public class ReporterScopedSettings {
 
     private final Set<String> excludedVariables;
 
+    private final MetricFilter filter;
+
     private final Map<String, String> additionalVariables;
 
     public ReporterScopedSettings(
             int reporterIndex,
             char delimiter,
+            MetricFilter filter,
             Set<String> excludedVariables,
             Map<String, String> additionalVariables) {
         this.excludedVariables = excludedVariables;
         Preconditions.checkArgument(reporterIndex >= 0);
         this.reporterIndex = reporterIndex;
         this.delimiter = delimiter;
+        this.filter = filter;
         this.additionalVariables = additionalVariables;
     }
 
@@ -53,6 +58,10 @@ public class ReporterScopedSettings {
         return delimiter;
     }
 
+    public MetricFilter getFilter() {
+        return filter;
+    }
+
     public Set<String> getExcludedVariables() {
         return excludedVariables;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index 4105a69..8748c39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -27,11 +27,14 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.metrics.util.TestCounter;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import 
org.apache.flink.runtime.metrics.CollectingMetricsReporter.MetricGroupAndName;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.filter.DefaultMetricFilter;
 import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -462,4 +465,41 @@ class MetricRegistryImplTest {
             throw new RuntimeException();
         }
     }
+
+    @Test
+    void testMetricFiltering() {
+        final String excludedMetricName = "excluded";
+        final NotificationCapturingReporter reporter = new 
NotificationCapturingReporter();
+
+        final Configuration reporterConfig = new Configuration();
+        reporterConfig.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:counter"));
+        reporterConfig.set(
+                MetricOptions.REPORTER_EXCLUDES, Arrays.asList("*:" + 
excludedMetricName));
+
+        MetricRegistryImpl registry =
+                new MetricRegistryImpl(
+                        
MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
+                        Arrays.asList(
+                                ReporterSetup.forReporter(
+                                        "test",
+                                        
DefaultMetricFilter.fromConfiguration(reporterConfig),
+                                        reporter)));
+
+        registry.register(
+                new TestMeter(), "", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+        assertThat(reporter.getLastAddedMetric()).isEmpty();
+
+        registry.register(
+                new TestCounter(),
+                excludedMetricName,
+                new MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+        assertThat(reporter.getLastAddedMetric()).isEmpty();
+
+        registry.register(
+                new TestCounter(), "foo", new 
MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+        assertThat(reporter.getLastAddedMetric()).isNotEmpty();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java
new file mode 100644
index 0000000..1d32521
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/filter/DefaultMetricFilterTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.filter;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricType;
+import org.apache.flink.metrics.util.TestCounter;
+import org.apache.flink.metrics.util.TestMeter;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Execution(ExecutionMode.CONCURRENT)
+class DefaultMetricFilterTest {
+
+    private static final Counter COUNTER = new TestCounter();
+    private static final Meter METER = new TestMeter();
+    private static final Gauge<Integer> GAUGE = () -> 4;
+
+    @Test
+    void testConvertToPatternWithoutWildcards() {
+        final Pattern pattern = 
DefaultMetricFilter.convertToPattern("numRecordsIn");
+        assertThat(pattern.toString()).isEqualTo("(numRecordsIn)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternSingle() {
+        final Pattern pattern = 
DefaultMetricFilter.convertToPattern("numRecords*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.*)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testConvertToPatternMultiple() {
+        final Pattern pattern = 
DefaultMetricFilter.convertToPattern("numRecords*,numBytes*");
+        assertThat(pattern.toString()).isEqualTo("(numRecords.*|numBytes.*)");
+        assertThat(pattern.matcher("numRecordsIn").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytesOut").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("numBytes").matches()).isEqualTo(true);
+        assertThat(pattern.matcher("hello").matches()).isEqualTo(false);
+    }
+
+    @Test
+    void testParseMetricTypesSingle() {
+        final EnumSet<MetricType> types = 
DefaultMetricFilter.parseMetricTypes("meter");
+        assertThat(types).containsExactly(MetricType.METER);
+    }
+
+    @Test
+    void testParseMetricTypesMultiple() {
+        final EnumSet<MetricType> types = 
DefaultMetricFilter.parseMetricTypes("meter,counter");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, 
MetricType.COUNTER);
+    }
+
+    @Test
+    void testParseMetricTypesCaseIgnored() {
+        final EnumSet<MetricType> types = 
DefaultMetricFilter.parseMetricTypes("meter,CoUnTeR");
+        assertThat(types).containsExactlyInAnyOrder(MetricType.METER, 
MetricType.COUNTER);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                MetricOptions.REPORTER_INCLUDES, Arrays.asList("include1:*:*", 
"include2.*:*:*"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Collections.emptyList());
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include1")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include1.bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include2")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include2.bar")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:name:*"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Collections.emptyList());
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", 
"bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", 
"bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:counter"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Collections.emptyList());
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByScope() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:*"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Arrays.asList("include1", "include2.*"));
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include1")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include1.bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include2")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "name", 
"include2.bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByName() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:*"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Arrays.asList("*:faa*", "*:foo"));
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "name", 
"bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "foo", 
"bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(COUNTER, "foob", 
"bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(COUNTER, "faab", 
"bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeByType() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:*"));
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Arrays.asList("*:*:meter"));
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationIncludeDefault() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Arrays.asList("*:*:meter"));
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", 
"hello")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", 
"hello")).isEqualTo(false);
+    }
+
+    @Test
+    void testFromConfigurationExcludeDefault() {
+        Configuration configuration = new Configuration();
+        configuration.set(MetricOptions.REPORTER_INCLUDES, 
Arrays.asList("*:*:*"));
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationAllDefault() {
+        Configuration configuration = new Configuration();
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(true);
+    }
+
+    @Test
+    void testFromConfigurationMultiplePatterns() {
+        Configuration configuration = new Configuration();
+
+        configuration.set(MetricOptions.REPORTER_EXCLUDES, 
Arrays.asList("*:*:*"));
+        configuration.setString(
+                MetricOptions.REPORTER_EXCLUDES.key(), 
"*:foo,bar:meter;*:foo,bar:gauge");
+
+        final MetricFilter metricFilter = 
DefaultMetricFilter.fromConfiguration(configuration);
+
+        assertThat(metricFilter.filter(COUNTER, "foo", "bar")).isEqualTo(true);
+        assertThat(metricFilter.filter(METER, "foo", "bar")).isEqualTo(false);
+        assertThat(metricFilter.filter(GAUGE, "foo", "bar")).isEqualTo(false);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java
index a1208fb..2cd655e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.metrics.filter.MetricFilter;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -48,7 +49,11 @@ public class FrontMetricGroupTest {
         final FrontMetricGroup<?> frontMetricGroup =
                 new FrontMetricGroup<>(
                         new ReporterScopedSettings(
-                                0, delimiter, Collections.emptySet(), 
Collections.emptyMap()),
+                                0,
+                                delimiter,
+                                MetricFilter.NO_OP_FILTER,
+                                Collections.emptySet(),
+                                Collections.emptyMap()),
                         new ProcessMetricGroup(
                                 TestingMetricRegistry.builder()
                                         
.setScopeFormats(ScopeFormats.fromConfig(config))
@@ -79,7 +84,11 @@ public class FrontMetricGroupTest {
         final FrontMetricGroup<?> frontMetricGroup =
                 new FrontMetricGroup<>(
                         new ReporterScopedSettings(
-                                0, delimiter, Collections.emptySet(), 
Collections.emptyMap()),
+                                0,
+                                delimiter,
+                                MetricFilter.NO_OP_FILTER,
+                                Collections.emptySet(),
+                                Collections.emptyMap()),
                         new ProcessMetricGroup(
                                 TestingMetricRegistry.builder()
                                         
.setScopeFormats(ScopeFormats.fromConfig(config))
@@ -107,6 +116,7 @@ public class FrontMetricGroupTest {
                         new ReporterScopedSettings(
                                 0,
                                 '.',
+                                MetricFilter.NO_OP_FILTER,
                                 Collections.emptySet(),
                                 ImmutableMap.of(ScopeFormat.asVariable("foo"), 
"bar")),
                         new 
ProcessMetricGroup(TestingMetricRegistry.builder().build(), "host"));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 754766d..f205559 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -402,7 +402,7 @@ public class MetricGroupTest extends TestLogger {
 
         @Override
         protected String getGroupName(CharacterFilter filter) {
-            return "";
+            return "foo";
         }
 
         @Override

Reply via email to