This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new c4d66c5  [FLINK-10857][metrics] Cache logical scopes separately for 
each reporter
c4d66c5 is described below

commit c4d66c53d8e5d4de4ebcb47f816aa11988c8bb99
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Nov 15 13:05:07 2018 +0100

    [FLINK-10857][metrics] Cache logical scopes separately for each reporter
---
 .../metrics/groups/AbstractMetricGroup.java        | 37 ++++++++++----
 .../runtime/metrics/groups/FrontMetricGroup.java   |  2 +-
 .../metrics/groups/AbstractMetricGroupTest.java    | 56 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 909915f..4400b14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -90,9 +90,9 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
        private final String[] scopeStrings;
 
-       /** The logical metrics scope represented by this group, as a 
concatenated string, lazily computed.
+       /** The logical metrics scope represented by this group for each 
reporter, as a concatenated string, lazily computed.
         * For example: "taskmanager.job.task" */
-       private String logicalScopeString;
+       private String[] logicalScopeStrings;
 
        /** The metrics query service scope represented by this group, lazily 
computed. */
        protected QueryScopeInfo queryServiceScopeInfo;
@@ -107,6 +107,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                this.scopeComponents = checkNotNull(scope);
                this.parent = parent;
                this.scopeStrings = new String[registry.getNumberReporters()];
+               this.logicalScopeStrings = new 
String[registry.getNumberReporters()];
        }
 
        public Map<String, String> getAllVariables() {
@@ -152,14 +153,34 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * @return logical scope
         */
        public String getLogicalScope(CharacterFilter filter, char delimiter) {
-               if (logicalScopeString == null) {
-                       if (parent == null) {
-                               logicalScopeString = getGroupName(filter);
-                       } else {
-                               logicalScopeString = 
parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
+               return getLogicalScope(filter, delimiter, -1);
+       }
+
+       /**
+        * Returns the logical scope of this group, for example
+        * {@code "taskmanager.job.task"}.
+        *
+        * @param filter character filter which is applied to the scope 
components
+        * @param delimiter delimiter to use for concatenating scope components
+        * @param reporterIndex index of the reporter
+        * @return logical scope
+        */
+       String getLogicalScope(CharacterFilter filter, char delimiter, int 
reporterIndex) {
+               if (logicalScopeStrings.length == 0 || (reporterIndex < 0 || 
reporterIndex >= logicalScopeStrings.length)) {
+                       return createLogicalScope(filter, delimiter);
+               } else {
+                       if (logicalScopeStrings[reporterIndex] == null) {
+                               logicalScopeStrings[reporterIndex] = 
createLogicalScope(filter, delimiter);
                        }
+                       return logicalScopeStrings[reporterIndex];
                }
-               return logicalScopeString;
+       }
+
+       private String createLogicalScope(CharacterFilter filter, char 
delimiter) {
+               final String groupName = getGroupName(filter);
+               return parent == null
+                       ? groupName
+                       : parent.getLogicalScope(filter, delimiter) + delimiter 
+ groupName;
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
index 63842fe..64397d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
@@ -52,6 +52,6 @@ public class FrontMetricGroup<P extends 
AbstractMetricGroup<?>> extends ProxyMet
        }
 
        public String getLogicalScope(CharacterFilter filter, char delimiter) {
-               return parentMetricGroup.getLogicalScope(filter, delimiter);
+               return parentMetricGroup.getLogicalScope(filter, delimiter, 
this.reporterIndex);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f8ed3c6..f3f8b42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -105,6 +105,30 @@ public class AbstractMetricGroupTest {
                }
        }
 
+       @Test
+       public void testLogicalScopeCachingForMultipleReporters() throws 
Exception {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
LogicalScopeReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
LogicalScopeReporter2.class.getName());
+
+               MetricRegistryImpl testRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+               try {
+                       MetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id")
+                               .addGroup("B")
+                               .addGroup("C");
+                       tmGroup.counter("1");
+                       assertEquals("Reporters were not properly 
instantiated", 2, testRegistry.getReporters().size());
+                       for (MetricReporter reporter : 
testRegistry.getReporters()) {
+                               ScopeCheckingTestReporter typedReporter = 
(ScopeCheckingTestReporter) reporter;
+                               if (typedReporter.failureCause != null) {
+                                       throw typedReporter.failureCause;
+                               }
+                       }
+               } finally {
+                       testRegistry.shutdown().get();
+               }
+       }
+
        private abstract static class ScopeCheckingTestReporter extends 
TestReporter {
                protected Exception failureCause;
 
@@ -175,6 +199,38 @@ public class AbstractMetricGroupTest {
                }
        }
 
+       /**
+        * Reporter that verifies the logical-scope caching behavior.
+        */
+       public static final class LogicalScopeReporter1 extends 
ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_B.filterCharacters(input);
+               }
+
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       final String logicalScope = 
((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, '-');
+                       assertEquals("taskmanager-X-C", logicalScope);
+               }
+       }
+
+       /**
+        * Reporter that verifies the logical-scope caching behavior.
+        */
+       public static final class LogicalScopeReporter2 extends 
ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_C.filterCharacters(input);
+               }
+
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       final String logicalScope = 
((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, ',');
+                       assertEquals("taskmanager,B,X", logicalScope);
+               }
+       }
+
        @Test
        public void testScopeGenerationWithoutReporters() throws Exception {
                Configuration config = new Configuration();

Reply via email to