zentol closed pull request #7095: [FLINK-10857][metrics] Cache logical scopes 
separately for each reporter
URL: https://github.com/apache/flink/pull/7095
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 909915f216e..4400b14a10d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -90,9 +90,9 @@
         * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
        private final String[] scopeStrings;
 
-       /** The logical metrics scope represented by this group, as a 
concatenated string, lazily computed.
+       /** The logical metrics scope represented by this group for each 
reporter, as a concatenated string, lazily computed.
         * For example: "taskmanager.job.task" */
-       private String logicalScopeString;
+       private String[] logicalScopeStrings;
 
        /** The metrics query service scope represented by this group, lazily 
computed. */
        protected QueryScopeInfo queryServiceScopeInfo;
@@ -107,6 +107,7 @@ public AbstractMetricGroup(MetricRegistry registry, 
String[] scope, A parent) {
                this.scopeComponents = checkNotNull(scope);
                this.parent = parent;
                this.scopeStrings = new String[registry.getNumberReporters()];
+               this.logicalScopeStrings = new 
String[registry.getNumberReporters()];
        }
 
        public Map<String, String> getAllVariables() {
@@ -152,14 +153,34 @@ public String getLogicalScope(CharacterFilter filter) {
         * @return logical scope
         */
        public String getLogicalScope(CharacterFilter filter, char delimiter) {
-               if (logicalScopeString == null) {
-                       if (parent == null) {
-                               logicalScopeString = getGroupName(filter);
-                       } else {
-                               logicalScopeString = 
parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
+               return getLogicalScope(filter, delimiter, -1);
+       }
+
+       /**
+        * Returns the logical scope of this group, for example
+        * {@code "taskmanager.job.task"}.
+        *
+        * @param filter character filter which is applied to the scope 
components
+        * @param delimiter delimiter to use for concatenating scope components
+        * @param reporterIndex index of the reporter
+        * @return logical scope
+        */
+       String getLogicalScope(CharacterFilter filter, char delimiter, int 
reporterIndex) {
+               if (logicalScopeStrings.length == 0 || (reporterIndex < 0 || 
reporterIndex >= logicalScopeStrings.length)) {
+                       return createLogicalScope(filter, delimiter);
+               } else {
+                       if (logicalScopeStrings[reporterIndex] == null) {
+                               logicalScopeStrings[reporterIndex] = 
createLogicalScope(filter, delimiter);
                        }
+                       return logicalScopeStrings[reporterIndex];
                }
-               return logicalScopeString;
+       }
+
+       private String createLogicalScope(CharacterFilter filter, char 
delimiter) {
+               final String groupName = getGroupName(filter);
+               return parent == null
+                       ? groupName
+                       : parent.getLogicalScope(filter, delimiter) + delimiter 
+ groupName;
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
index 63842fef9d6..64397d33db6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
@@ -52,6 +52,6 @@ public String getLogicalScope(CharacterFilter filter) {
        }
 
        public String getLogicalScope(CharacterFilter filter, char delimiter) {
-               return parentMetricGroup.getLogicalScope(filter, delimiter);
+               return parentMetricGroup.getLogicalScope(filter, delimiter, 
this.reporterIndex);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f8ed3c6a6d8..f3f8b42b851 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -105,6 +105,30 @@ public void testScopeCachingForMultipleReporters() throws 
Exception {
                }
        }
 
+       @Test
+       public void testLogicalScopeCachingForMultipleReporters() throws 
Exception {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
LogicalScopeReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
LogicalScopeReporter2.class.getName());
+
+               MetricRegistryImpl testRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+               try {
+                       MetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id")
+                               .addGroup("B")
+                               .addGroup("C");
+                       tmGroup.counter("1");
+                       assertEquals("Reporters were not properly 
instantiated", 2, testRegistry.getReporters().size());
+                       for (MetricReporter reporter : 
testRegistry.getReporters()) {
+                               ScopeCheckingTestReporter typedReporter = 
(ScopeCheckingTestReporter) reporter;
+                               if (typedReporter.failureCause != null) {
+                                       throw typedReporter.failureCause;
+                               }
+                       }
+               } finally {
+                       testRegistry.shutdown().get();
+               }
+       }
+
        private abstract static class ScopeCheckingTestReporter extends 
TestReporter {
                protected Exception failureCause;
 
@@ -175,6 +199,38 @@ public String filterCharacters(String input) {
                }
        }
 
+       /**
+        * Reporter that verifies the logical-scope caching behavior.
+        */
+       public static final class LogicalScopeReporter1 extends 
ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_B.filterCharacters(input);
+               }
+
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       final String logicalScope = 
((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, '-');
+                       assertEquals("taskmanager-X-C", logicalScope);
+               }
+       }
+
+       /**
+        * Reporter that verifies the logical-scope caching behavior.
+        */
+       public static final class LogicalScopeReporter2 extends 
ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_C.filterCharacters(input);
+               }
+
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       final String logicalScope = 
((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(this, ',');
+                       assertEquals("taskmanager,B,X", logicalScope);
+               }
+       }
+
        @Test
        public void testScopeGenerationWithoutReporters() throws Exception {
                Configuration config = new Configuration();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to