zentol commented on a change in pull request #14510:
URL: https://github.com/apache/flink/pull/14510#discussion_r551244274
##########
File path:
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
##########
@@ -186,6 +186,15 @@
*/
Map<String, String> getAllVariables();
+ /**
+ * Returns a map of all variables and their associated value, for example
{@code
+ * {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}.
+ *
+ * @param filter character filter which is applied to the variables and
values if not null.
+ * @return map of all variables and their associated value
+ */
+ Map<String, String> getAllVariables(CharacterFilter filter);
Review comment:
It is not required to extend this interface since the FrontMetricGroup
works against the AbstractMetricGroup, and we currently don't want to extend
this interface further.
(This will also render some other changes moot)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -107,6 +108,8 @@
/** Flag indicating whether this group has been closed. */
private volatile boolean closed;
+ @VisibleForTesting public static final char DEFAULT_REPLACEMENT = '_';
Review comment:
```suggestion
@VisibleForTesting
public static final char DEFAULT_REPLACEMENT = '_';
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -137,11 +156,18 @@ public AbstractMetricGroup(MetricRegistry registry,
String[] scope, A parent) {
// if no variables are excluded (which is the default!) we re-use the
general variables map
// to save space
return internalGetAllVariables(
- excludedVariables.isEmpty() ? 0 : reporterIndex,
excludedVariables);
+ filter,
+ excludedVariables.isEmpty() ? 0 : reporterIndex,
+ excludedVariables,
+ delimiter);
}
private Map<String, String> internalGetAllVariables(
- int cachingIndex, Set<String> excludedVariables) {
+ CharacterFilter filter,
+ int cachingIndex,
+ Set<String> excludedVariables,
+ char delimiter) {
+ CharacterFilter newFilter = wrapWithDefaultFilter(filter, delimiter);
Review comment:
This implies that variables returned by `parent.getAllVariables()`
already had a filter applied to them for a delimiter that may not be matching
`delimiter` (i.e., `registry.getDelimiter()`).
Furthermore, this also means that exclusion may not be applied correctly.
I think we need to restrict the filtering to cases where the calls originate
from a reporter.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -119,12 +122,28 @@ public AbstractMetricGroup(MetricRegistry registry,
String[] scope, A parent) {
this.variables = new Map[registry.getNumberReporters() + 1];
}
+ CharacterFilter wrapWithDefaultFilter(CharacterFilter filter, char
delimiter) {
Review comment:
```suggestion
private CharacterFilter wrapWithDefaultFilter(CharacterFilter filter,
char delimiter) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
##########
@@ -119,12 +122,28 @@ public AbstractMetricGroup(MetricRegistry registry,
String[] scope, A parent) {
this.variables = new Map[registry.getNumberReporters() + 1];
}
+ CharacterFilter wrapWithDefaultFilter(CharacterFilter filter, char
delimiter) {
+ CharacterFilter defaultFilter = input -> input.replace(delimiter,
DEFAULT_REPLACEMENT);
+ return filter == null
+ ? defaultFilter
+ : input ->
defaultFilter.filterCharacters(filter.filterCharacters(input));
+ }
+
@Override
public Map<String, String> getAllVariables() {
- return internalGetAllVariables(0, Collections.emptySet());
+ return internalGetAllVariables(null, 0, Collections.emptySet(),
registry.getDelimiter());
+ }
+
+ @Override
+ public Map<String, String> getAllVariables(CharacterFilter filter) {
+ return internalGetAllVariables(filter, 0, Collections.emptySet(),
registry.getDelimiter());
}
- public Map<String, String> getAllVariables(int reporterIndex, Set<String>
excludedVariables) {
+ public Map<String, String> getAllVariables(
+ CharacterFilter filter,
+ int reporterIndex,
+ Set<String> excludedVariables,
+ char delimiter) {
Review comment:
I'd like to keep the reporterIndex as the first argument because all
other arguments are somewhat dependent on it
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
##########
@@ -415,4 +419,119 @@ public void testGetAllVariablesDoesNotDeadlock() throws
InterruptedException {
childRegisteringThread.join();
}
}
+
+ @Test
+ public void testReporterDelimiterIsFiltered() throws Exception {
+ MetricConfig metricConfig = new MetricConfig();
+ metricConfig.setProperty(
+ ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER,
String.valueOf('*'));
+
+ MetricRegistryImpl metricRegistry =
+ new MetricRegistryImpl(
+
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+ Collections.singletonList(
+ ReporterSetup.forReporter(
+ "test", metricConfig, new
TestReporter())));
+ try {
+ String metricName = "Test*Counter";
+ TaskManagerMetricGroup metricGroup =
+ new TaskManagerMetricGroup(metricRegistry, "hos*t", "i*d");
+ Counter counter = metricGroup.counter(metricName);
+ TestReporter reporter = (TestReporter)
metricRegistry.getReporters().get(0);
+ final String expected =
+ String.format(
+ "%s%s%s%s%s%s%s",
+ "hos*t".replace('*',
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+ '*',
+ TaskExecutor.TASK_MANAGER_NAME.replace(
+ '*',
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+ '*',
+ "i*d".replace('*',
AbstractMetricGroup.DEFAULT_REPLACEMENT),
+ '*',
+ metricName.replace('*',
AbstractMetricGroup.DEFAULT_REPLACEMENT));
+ assertEquals(expected, reporter.getCounters().get(counter));
+ } finally {
+ metricRegistry.shutdown().get();
+ }
+ }
+
+ private static class TestReporter extends AbstractReporter {
+ @Override
+ public void open(MetricConfig config) {}
+
+ @Override
+ public void close() {}
+
+ public Map<Counter, String> getCounters() {
+ return counters;
+ }
+
+ @Override
+ public String filterCharacters(String input) {
+ return input;
+ }
+ }
+
+ @Test
+ public void testLowLevelGetAllVariablesWithFilterAndDelimiter() throws
Exception {
+ MetricRegistry registry =
TestingMetricRegistry.builder().setNumberReporters(2).build();
+
+ AbstractMetricGroup<?> group =
+ new GenericMetricGroup(registry, null, "test") {
+ @Override
+ protected void putVariables(Map<String, String> variables)
{
+ variables.put("k*1", "v*1");
+ variables.put("k*2", "v*2");
+ }
+ };
+
+ Map<String, String> allVariables;
+ final CharacterFilter simpleFilter = input -> input.replace('1', 'a');
+
+ // test filter wrapping
+ allVariables = group.getAllVariables(simpleFilter, 0,
Collections.emptySet(), '*');
+ assertThat(allVariables, IsMapContaining.hasKey("k_a"));
+ assertThat(allVariables, IsMapContaining.hasValue("v_a"));
+
+ // test default filter, add exclusions to avoid cache hit
+ allVariables = group.getAllVariables(null, 1,
Collections.singleton("k*2"), '*');
+ assertThat(allVariables, IsMapContaining.hasKey("k_1"));
+ assertThat(allVariables, IsMapContaining.hasValue("v_1"));
Review comment:
There should also be a test that an exclusion that matches the filtered
variable (e.g., k_1) is not applied. (i.e., that exclusions are always applied
before they are filtered)
That said, I don't think that currently works correctly if a group retrieves
variables from a parent, because they will already have the default filter
applied to them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]