Repository: flink Updated Branches: refs/heads/master 441400855 -> 4eb71927b
[FLINK-4563] [metrics] scope caching not adjusted for multiple reporters This closes #2650. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f784a3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f784a3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f784a3 Branch: refs/heads/master Commit: 86f784a3613ccd5d78197d94198a64b6f1333578 Parents: fe843e1 Author: Anton Mushin <[email protected]> Authored: Mon Oct 17 17:23:01 2016 +0400 Committer: zentol <[email protected]> Committed: Thu Dec 8 12:04:48 2016 +0100 ---------------------------------------------------------------------- .../metrics/groups/AbstractMetricGroup.java | 53 ++++---- .../metrics/groups/AbstractMetricGroupTest.java | 135 +++++++++++++++++++ 2 files changed, 165 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 04b8158..6ff9776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ private final String[] scopeComponents; - /** The metrics scope represented by this group, as a concatenated string, lazily computed. + /** Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed. * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ - private String scopeString; + private final String[] scopeStrings; /** The logical metrics scope represented by this group, as a concatenated string, lazily computed. * For example: "taskmanager.job.task" */ @@ -105,6 +106,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl this.registry = checkNotNull(registry); this.scopeComponents = checkNotNull(scope); this.parent = parent; + this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()]; } public Map<String, String> getAllVariables() { @@ -210,19 +212,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter) { - if (scopeString == null) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); - } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); - } - } - - if (filter != null) { - return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName); - } else { - return scopeString + registry.getDelimiter() + metricName; - } + return getMetricIdentifier(metricName, filter, -1); } /** @@ -235,12 +225,29 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents); - return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); + if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) { + char delimiter = registry.getDelimiter(); + String newScopeString; + if (filter != null) { + newScopeString = ScopeFormat.concat(filter, delimiter, scopeComponents); + metricName = filter.filterCharacters(metricName); + } else { + newScopeString = ScopeFormat.concat(delimiter, scopeComponents); + } + return newScopeString + delimiter + metricName; } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents); - return scopeString + registry.getDelimiter(reporterIndex) + metricName; + char delimiter = registry.getDelimiter(reporterIndex); + if (scopeStrings[reporterIndex] == null) { + if (filter != null) { + scopeStrings[reporterIndex] = ScopeFormat.concat(filter, delimiter, scopeComponents); + } else { + scopeStrings[reporterIndex] = ScopeFormat.concat(delimiter, scopeComponents); + } + } + if (filter != null) { + metricName = filter.filterCharacters(metricName); + } + return scopeStrings[reporterIndex] + delimiter + metricName; } } @@ -353,7 +360,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); + name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } registry.register(metric, name, this); @@ -365,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Group already contains a Metric with the name '" + - name + "'. Metric will not be reported. (" + scopeString + ')'); + name + "'. Metric will not be reported." + Arrays.toString(scopeComponents)); } } } @@ -389,7 +396,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl // program when used incorrectly if (metrics.containsKey(name)) { LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); + name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name); http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index c7b392f..da14bfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -17,12 +17,19 @@ */ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.TestReporter; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class AbstractMetricGroupTest { @@ -49,4 +56,132 @@ public class AbstractMetricGroupTest { registry.shutdown(); } + + // ======================================================================== + // Scope Caching + // ======================================================================== + + private static final CharacterFilter FILTER_C = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("C", "X"); + } + }; + private static final CharacterFilter FILTER_B = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("B", "X"); + } + }; + + @Test + public void testScopeCachingForMultipleReporters() throws Exception { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!"); + + MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + try { + MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id"); + tmGroup.counter("1"); + assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size()); + for (MetricReporter reporter : testRegistry.getReporters()) { + ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter; + if (typedReporter.failureCause != null) { + throw typedReporter.failureCause; + } + } + } finally { + testRegistry.shutdown(); + } + } + + private abstract static class ScopeCheckingTestReporter extends TestReporter { + protected Exception failureCause; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + try { + checkScopes(metric, metricName, group); + } catch (Exception e) { + if (failureCause == null) { + failureCause = e; + } + } + } + + public abstract void checkScopes(Metric metric, String metricName, MetricGroup group); + + } + + public static class TestReporter1 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return FILTER_B.filterCharacters(input); + } + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + // the first call determines which filter is applied to all future calls; in this case no filter is used at all + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName)); + // from now on the scope string is cached and should not be reliant on the given filter + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, FILTER_C)); + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, this)); + // the metric name however is still affected by the filter as it is not cached + assertEquals("A-B-C-D-4", group.getMetricIdentifier(metricName, new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("B", "X").replace("1", "4"); + } + })); + } + } + + public static class TestReporter2 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return FILTER_C.filterCharacters(input); + } + + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + // the first call determines which filter is applied to all future calls + assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName, this)); + // from now on the scope string is cached and should not be reliant on the given filter + assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName)); + assertEquals("A!B!X!D!1", group.getMetricIdentifier(metricName, FILTER_C)); + // the metric name however is still affected by the filter as it is not cached + assertEquals("A!B!X!D!3", group.getMetricIdentifier(metricName, new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("A", "X").replace("1", "3"); + } + })); + } + } + + @Test + public void testScopeGenerationWithoutReporters() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); + MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + try { + TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id"); + assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size()); + + // default delimiter should be used + assertEquals("A.B.X.D.1", group.getMetricIdentifier("1", FILTER_C)); + // no caching should occur + assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B)); + // invalid reporter indices do not throw errors + assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1)); + assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2)); + } finally { + testRegistry.shutdown(); + } + } }
