zentol commented on a change in pull request #14510:
URL: https://github.com/apache/flink/pull/14510#discussion_r551244274



##########
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
##########
@@ -186,6 +186,15 @@
      */
     Map<String, String> getAllVariables();
 
+    /**
+     * Returns a map of all variables and their associated value, for example 
{@code
+     * {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}.
+     *
+     * @param filter character filter which is applied to the variables and 
values if not null.
+     * @return map of all variables and their associated value
+     */
+    Map<String, String> getAllVariables(CharacterFilter filter);

Review comment:
       It is not required to extend this interface since the FrontMetricGroup 
works against the AbstractMetricGroup, and we currently don't want to extend 
this interface further.
   (This will also render some other changes moot)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -107,6 +108,8 @@
     /** Flag indicating whether this group has been closed. */
     private volatile boolean closed;
 
+    @VisibleForTesting public static final char DEFAULT_REPLACEMENT = '_';

Review comment:
       ```suggestion
       @VisibleForTesting
       public static final char DEFAULT_REPLACEMENT = '_';
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -137,11 +156,18 @@ public AbstractMetricGroup(MetricRegistry registry, 
String[] scope, A parent) {
         // if no variables are excluded (which is the default!) we re-use the 
general variables map
         // to save space
         return internalGetAllVariables(
-                excludedVariables.isEmpty() ? 0 : reporterIndex, 
excludedVariables);
+                filter,
+                excludedVariables.isEmpty() ? 0 : reporterIndex,
+                excludedVariables,
+                delimiter);
     }
 
     private Map<String, String> internalGetAllVariables(
-            int cachingIndex, Set<String> excludedVariables) {
+            CharacterFilter filter,
+            int cachingIndex,
+            Set<String> excludedVariables,
+            char delimiter) {
+        CharacterFilter newFilter = wrapWithDefaultFilter(filter, delimiter);

Review comment:
       This implies that variables returned by `parent.getAllVariables()` 
already had a filter applied to them for a delimiter that may not be matching 
`delimiter` (i.e., `registry.getDelimiter()`).
   Furthermore, this also means that exclusion may not be applied correctly.
   
   I think we need to restrict the filtering to cases where the calls originate 
from a reporter.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -119,12 +122,28 @@ public AbstractMetricGroup(MetricRegistry registry, 
String[] scope, A parent) {
         this.variables = new Map[registry.getNumberReporters() + 1];
     }
 
+    CharacterFilter wrapWithDefaultFilter(CharacterFilter filter, char 
delimiter) {

Review comment:
       ```suggestion
       private CharacterFilter wrapWithDefaultFilter(CharacterFilter filter, 
char delimiter) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -119,12 +122,28 @@ public AbstractMetricGroup(MetricRegistry registry, 
String[] scope, A parent) {
         this.variables = new Map[registry.getNumberReporters() + 1];
     }
 
+    CharacterFilter wrapWithDefaultFilter(CharacterFilter filter, char 
delimiter) {
+        CharacterFilter defaultFilter = input -> input.replace(delimiter, 
DEFAULT_REPLACEMENT);
+        return filter == null
+                ? defaultFilter
+                : input -> 
defaultFilter.filterCharacters(filter.filterCharacters(input));
+    }
+
     @Override
     public Map<String, String> getAllVariables() {
-        return internalGetAllVariables(0, Collections.emptySet());
+        return internalGetAllVariables(null, 0, Collections.emptySet(), 
registry.getDelimiter());
+    }
+
+    @Override
+    public Map<String, String> getAllVariables(CharacterFilter filter) {
+        return internalGetAllVariables(filter, 0, Collections.emptySet(), 
registry.getDelimiter());
     }
 
-    public Map<String, String> getAllVariables(int reporterIndex, Set<String> 
excludedVariables) {
+    public Map<String, String> getAllVariables(
+            CharacterFilter filter,
+            int reporterIndex,
+            Set<String> excludedVariables,
+            char delimiter) {

Review comment:
       I'd like to keep the reporterIndex as the first argument because all 
other arguments are somewhat dependent on it

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
##########
@@ -415,4 +419,119 @@ public void testGetAllVariablesDoesNotDeadlock() throws 
InterruptedException {
             childRegisteringThread.join();
         }
     }
+
+    @Test
+    public void testReporterDelimiterIsFiltered() throws Exception {
+        MetricConfig metricConfig = new MetricConfig();
+        metricConfig.setProperty(
+                ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf('*'));
+
+        MetricRegistryImpl metricRegistry =
+                new MetricRegistryImpl(
+                        
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+                        Collections.singletonList(
+                                ReporterSetup.forReporter(
+                                        "test", metricConfig, new 
TestReporter())));
+        try {
+            String metricName = "Test*Counter";
+            TaskManagerMetricGroup metricGroup =
+                    new TaskManagerMetricGroup(metricRegistry, "hos*t", "i*d");
+            Counter counter = metricGroup.counter(metricName);
+            TestReporter reporter = (TestReporter) 
metricRegistry.getReporters().get(0);
+            final String expected =
+                    String.format(
+                            "%s%s%s%s%s%s%s",
+                            "hos*t".replace('*', 
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+                            '*',
+                            TaskExecutor.TASK_MANAGER_NAME.replace(
+                                    '*', 
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+                            '*',
+                            "i*d".replace('*', 
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+                            '*',
+                            metricName.replace('*', 
AbstractMetricGroup.DEFAULT_REPLACEMENT));
+            assertEquals(expected, reporter.getCounters().get(counter));
+        } finally {
+            metricRegistry.shutdown().get();
+        }
+    }
+
+    private static class TestReporter extends AbstractReporter {
+        @Override
+        public void open(MetricConfig config) {}
+
+        @Override
+        public void close() {}
+
+        public Map<Counter, String> getCounters() {
+            return counters;
+        }
+
+        @Override
+        public String filterCharacters(String input) {
+            return input;
+        }
+    }
+
+    @Test
+    public void testLowLevelGetAllVariablesWithFilterAndDelimiter() throws 
Exception {
+        MetricRegistry registry = 
TestingMetricRegistry.builder().setNumberReporters(2).build();
+
+        AbstractMetricGroup<?> group =
+                new GenericMetricGroup(registry, null, "test") {
+                    @Override
+                    protected void putVariables(Map<String, String> variables) 
{
+                        variables.put("k*1", "v*1");
+                        variables.put("k*2", "v*2");
+                    }
+                };
+
+        Map<String, String> allVariables;
+        final CharacterFilter simpleFilter = input -> input.replace('1', 'a');
+
+        // test filter wrapping
+        allVariables = group.getAllVariables(simpleFilter, 0, 
Collections.emptySet(), '*');
+        assertThat(allVariables, IsMapContaining.hasKey("k_a"));
+        assertThat(allVariables, IsMapContaining.hasValue("v_a"));
+
+        // test default filter, add exclusions to avoid cache hit
+        allVariables = group.getAllVariables(null, 1, 
Collections.singleton("k*2"), '*');
+        assertThat(allVariables, IsMapContaining.hasKey("k_1"));
+        assertThat(allVariables, IsMapContaining.hasValue("v_1"));

Review comment:
       There should also be a test that an exclusion that matches the filtered 
variable (e.g., k_1) is not applied. (i.e., that exclusions are always applied 
before they are filtered)
   
   That said, I don't think that currently works correctly if a group retrieves 
variables from a parent, because they will already have the default filter 
applied to them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to