This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new c4d66c5 [FLINK-10857][metrics] Cache logical scopes separately for each reporter c4d66c5 is described below commit c4d66c53d8e5d4de4ebcb47f816aa11988c8bb99 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Nov 15 13:05:07 2018 +0100 [FLINK-10857][metrics] Cache logical scopes separately for each reporter --- .../metrics/groups/AbstractMetricGroup.java | 37 ++++++++++---- .../runtime/metrics/groups/FrontMetricGroup.java | 2 +- .../metrics/groups/AbstractMetricGroupTest.java | 56 ++++++++++++++++++++++ 3 files changed, 86 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 909915f..4400b14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -90,9 +90,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ private final String[] scopeStrings; - /** The logical metrics scope represented by this group, as a concatenated string, lazily computed. + /** The logical metrics scope represented by this group for each reporter, as a concatenated string, lazily computed. * For example: "taskmanager.job.task" */ - private String logicalScopeString; + private String[] logicalScopeStrings; /** The metrics query service scope represented by this group, lazily computed. */ protected QueryScopeInfo queryServiceScopeInfo; @@ -107,6 +107,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl this.scopeComponents = checkNotNull(scope); this.parent = parent; this.scopeStrings = new String[registry.getNumberReporters()]; + this.logicalScopeStrings = new String[registry.getNumberReporters()]; } public Map<String, String> getAllVariables() { @@ -152,14 +153,34 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * @return logical scope */ public String getLogicalScope(CharacterFilter filter, char delimiter) { - if (logicalScopeString == null) { - if (parent == null) { - logicalScopeString = getGroupName(filter); - } else { - logicalScopeString = parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter); + return getLogicalScope(filter, delimiter, -1); + } + + /** + * Returns the logical scope of this group, for example + * {@code "taskmanager.job.task"}. + * + * @param filter character filter which is applied to the scope components + * @param delimiter delimiter to use for concatenating scope components + * @param reporterIndex index of the reporter + * @return logical scope + */ + String getLogicalScope(CharacterFilter filter, char delimiter, int reporterIndex) { + if (logicalScopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= logicalScopeStrings.length)) { + return createLogicalScope(filter, delimiter); + } else { + if (logicalScopeStrings[reporterIndex] == null) { + logicalScopeStrings[reporterIndex] = createLogicalScope(filter, delimiter); } + return logicalScopeStrings[reporterIndex]; } - return logicalScopeString; + } + + private String createLogicalScope(CharacterFilter filter, char delimiter) { + final String groupName = getGroupName(filter); + return parent == null + ? groupName + : parent.getLogicalScope(filter, delimiter) + delimiter + groupName; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java index 63842fe..64397d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java @@ -52,6 +52,6 @@ public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends ProxyMet } public String getLogicalScope(CharacterFilter filter, char delimiter) { - return parentMetricGroup.getLogicalScope(filter, delimiter); + return parentMetricGroup.getLogicalScope(filter, delimiter, this.reporterIndex); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index f8ed3c6..f3f8b42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -105,6 +105,30 @@ public class AbstractMetricGroupTest { } } + @Test + public void testLogicalScopeCachingForMultipleReporters() throws Exception { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, LogicalScopeReporter2.class.getName()); + + MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + try { + MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id") + .addGroup("B") + .addGroup("C"); + tmGroup.counter("1"); + assertEquals("Reporters were not properly instantiated", 2, testRegistry.getReporters().size()); + for (MetricReporter reporter : testRegistry.getReporters()) { + ScopeCheckingTestReporter typedReporter = (ScopeCheckingTestReporter) reporter; + if (typedReporter.failureCause != null) { + throw typedReporter.failureCause; + } + } + } finally { + testRegistry.shutdown().get(); + } + } + private abstract static class ScopeCheckingTestReporter extends TestReporter { protected Exception failureCause; @@ -175,6 +199,38 @@ public class AbstractMetricGroupTest { } } + /** + * Reporter that verifies the logical-scope caching behavior. + */ + public static final class LogicalScopeReporter1 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return FILTER_B.filterCharacters(input); + } + + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + final String logicalScope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, '-'); + assertEquals("taskmanager-X-C", logicalScope); + } + } + + /** + * Reporter that verifies the logical-scope caching behavior. + */ + public static final class LogicalScopeReporter2 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return FILTER_C.filterCharacters(input); + } + + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + final String logicalScope = ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, ','); + assertEquals("taskmanager,B,X", logicalScope); + } + } + @Test public void testScopeGenerationWithoutReporters() throws Exception { Configuration config = new Configuration();