Repository: flink
Updated Branches:
  refs/heads/master 441400855 -> 4eb71927b


[FLINK-4563] [metrics] scope caching not adjusted for multiple reporters

This closes #2650.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f784a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f784a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f784a3

Branch: refs/heads/master
Commit: 86f784a3613ccd5d78197d94198a64b6f1333578
Parents: fe843e1
Author: Anton Mushin <[email protected]>
Authored: Mon Oct 17 17:23:01 2016 +0400
Committer: zentol <[email protected]>
Committed: Thu Dec 8 12:04:48 2016 +0100

----------------------------------------------------------------------
 .../metrics/groups/AbstractMetricGroup.java     |  53 ++++----
 .../metrics/groups/AbstractMetricGroupTest.java | 135 +++++++++++++++++++
 2 files changed, 165 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 04b8158..6ff9776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         *  For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ]. */
        private final String[] scopeComponents;
 
-       /** The metrics scope represented by this group, as a concatenated 
string, lazily computed.
+       /** Array containing the metrics scope represented by this group for 
each reporter, as a concatenated string, lazily computed.
         * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
-       private String scopeString;
+       private final String[] scopeStrings;
 
        /** The logical metrics scope represented by this group, as a 
concatenated string, lazily computed.
         * For example: "taskmanager.job.task" */
@@ -105,6 +106,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                this.registry = checkNotNull(registry);
                this.scopeComponents = checkNotNull(scope);
                this.parent = parent;
+               this.scopeStrings = new String[registry.getReporters() == null 
? 0 : registry.getReporters().size()];
        }
 
        public Map<String, String> getAllVariables() {
@@ -210,19 +212,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * @return fully qualified metric name
         */
        public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
-               if (scopeString == null) {
-                       if (filter != null) {
-                               scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(), scopeComponents);
-                       } else {
-                               scopeString = 
ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
-                       }
-               }
-
-               if (filter != null) {
-                       return scopeString + registry.getDelimiter() + 
filter.filterCharacters(metricName);
-               } else {
-                       return scopeString + registry.getDelimiter() + 
metricName;
-               }
+               return getMetricIdentifier(metricName, filter, -1);
        }
 
        /**
@@ -235,12 +225,29 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * @return fully qualified metric name
         */
        public String getMetricIdentifier(String metricName, CharacterFilter 
filter, int reporterIndex) {
-               if (filter != null) {
-                       scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(reporterIndex), scopeComponents);
-                       return scopeString + 
registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
+               if (scopeStrings.length == 0 || (reporterIndex < 0 || 
reporterIndex >= scopeStrings.length)) {
+                       char delimiter = registry.getDelimiter();
+                       String newScopeString;
+                       if (filter != null) {
+                               newScopeString = ScopeFormat.concat(filter, 
delimiter, scopeComponents);
+                               metricName = 
filter.filterCharacters(metricName);
+                       } else {
+                               newScopeString = ScopeFormat.concat(delimiter, 
scopeComponents);
+                       }
+                       return newScopeString + delimiter + metricName;
                } else {
-                       scopeString = 
ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents);
-                       return scopeString + 
registry.getDelimiter(reporterIndex) + metricName;
+                       char delimiter = registry.getDelimiter(reporterIndex);
+                       if (scopeStrings[reporterIndex] == null) {
+                               if (filter != null) {
+                                       scopeStrings[reporterIndex] = 
ScopeFormat.concat(filter, delimiter, scopeComponents);
+                               } else {
+                                       scopeStrings[reporterIndex] = 
ScopeFormat.concat(delimiter, scopeComponents);
+                               }
+                       }
+                       if (filter != null) {
+                               metricName = 
filter.filterCharacters(metricName);
+                       }
+                       return scopeStrings[reporterIndex] + delimiter + 
metricName;
                }
        }
        
@@ -353,7 +360,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                                                // we warn here, rather than 
failing, because metrics are tools that should not fail the
                                                // program when used incorrectly
                                                LOG.warn("Name collision: 
Adding a metric with the same name as a metric subgroup: '" +
-                                                               name + "'. 
Metric might not get properly reported. (" + scopeString + ')');
+                                                               name + "'. 
Metric might not get properly reported. " + Arrays.toString(scopeComponents));
                                        }
 
                                        registry.register(metric, name, this);
@@ -365,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                                        // we warn here, rather than failing, 
because metrics are tools that should not fail the
                                        // program when used incorrectly
                                        LOG.warn("Name collision: Group already 
contains a Metric with the name '" +
-                                                       name + "'. Metric will 
not be reported. (" + scopeString + ')');
+                                                       name + "'. Metric will 
not be reported." + Arrays.toString(scopeComponents));
                                }
                        }
                }
@@ -389,7 +396,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                                // program when used incorrectly
                                if (metrics.containsKey(name)) {
                                        LOG.warn("Name collision: Adding a 
metric subgroup with the same name as an existing metric: '" +
-                                                       name + "'. Metric might 
not get properly reported. (" + scopeString + ')');
+                                                       name + "'. Metric might 
not get properly reported. " + Arrays.toString(scopeComponents));
                                }
 
                                AbstractMetricGroup newGroup = new 
GenericMetricGroup(registry, this, name);

http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index c7b392f..da14bfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -17,12 +17,19 @@
  */
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AbstractMetricGroupTest {
@@ -49,4 +56,132 @@ public class AbstractMetricGroupTest {
                
                registry.shutdown();
        }
+
+       // 
========================================================================
+       // Scope Caching
+       // 
========================================================================
+
+       private static final CharacterFilter FILTER_C = new CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return input.replace("C", "X");
+               }
+       };
+       private static final CharacterFilter FILTER_B = new CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return input.replace("B", "X");
+               }
+       };
+
+       @Test
+       public void testScopeCachingForMultipleReporters() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
+
+               MetricRegistry testRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               try {
+                       MetricGroup tmGroup = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+                       tmGroup.counter("1");
+                       assertEquals("Reporters were not properly 
instantiated", 2, testRegistry.getReporters().size());
+                       for (MetricReporter reporter : 
testRegistry.getReporters()) {
+                               ScopeCheckingTestReporter typedReporter = 
(ScopeCheckingTestReporter) reporter;
+                               if (typedReporter.failureCause != null) {
+                                       throw typedReporter.failureCause;
+                               }
+                       }
+               } finally {
+                       testRegistry.shutdown();
+               }
+       }
+
+       private abstract static class ScopeCheckingTestReporter extends 
TestReporter {
+               protected Exception failureCause;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       try {
+                               checkScopes(metric, metricName, group);
+                       } catch (Exception e) {
+                               if (failureCause == null) {
+                                       failureCause = e;
+                               }
+                       }
+               }
+
+               public abstract void checkScopes(Metric metric, String 
metricName, MetricGroup group);
+
+       }
+
+       public static class TestReporter1 extends ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_B.filterCharacters(input);
+               }
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       // the first call determines which filter is applied to 
all future calls; in this case no filter is used at all
+                       assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName));
+                       // from now on the scope string is cached and should 
not be reliant on the given filter
+                       assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, FILTER_C));
+                       assertEquals("A-B-C-D-1", 
group.getMetricIdentifier(metricName, this));
+                       // the metric name however is still affected by the 
filter as it is not cached
+                       assertEquals("A-B-C-D-4", 
group.getMetricIdentifier(metricName, new CharacterFilter() {
+                               @Override
+                               public String filterCharacters(String input) {
+                                       return input.replace("B", 
"X").replace("1", "4");
+                               }
+                       }));
+               }
+       }
+
+       public static class TestReporter2 extends ScopeCheckingTestReporter {
+               @Override
+               public String filterCharacters(String input) {
+                       return FILTER_C.filterCharacters(input);
+               }
+
+               @Override
+               public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
+                       // the first call determines which filter is applied to 
all future calls
+                       assertEquals("A!B!X!D!1", 
group.getMetricIdentifier(metricName, this));
+                       // from now on the scope string is cached and should 
not be reliant on the given filter
+                       assertEquals("A!B!X!D!1", 
group.getMetricIdentifier(metricName));
+                       assertEquals("A!B!X!D!1", 
group.getMetricIdentifier(metricName, FILTER_C));
+                       // the metric name however is still affected by the 
filter as it is not cached
+                       assertEquals("A!B!X!D!3", 
group.getMetricIdentifier(metricName, new CharacterFilter() {
+                               @Override
+                               public String filterCharacters(String input) {
+                                       return input.replace("A", 
"X").replace("1", "3");
+                               }
+                       }));
+               }
+       }
+
+       @Test
+       public void testScopeGenerationWithoutReporters() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+               MetricRegistry testRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+               try {
+                       TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
+                       assertEquals("MetricReporters list should be empty", 0, 
testRegistry.getReporters().size());
+                       
+                       // default delimiter should be used
+                       assertEquals("A.B.X.D.1", 
group.getMetricIdentifier("1", FILTER_C));
+                       // no caching should occur
+                       assertEquals("A.X.C.D.1", 
group.getMetricIdentifier("1", FILTER_B));
+                       // invalid reporter indices do not throw errors
+                       assertEquals("A.X.C.D.1", 
group.getMetricIdentifier("1", FILTER_B, -1));
+                       assertEquals("A.X.C.D.1", 
group.getMetricIdentifier("1", FILTER_B, 2));
+               } finally {
+                       testRegistry.shutdown();
+               }
+       }
 }

Reply via email to