Repository: flink
Updated Branches:
  refs/heads/master 344fe94db -> 9f4be44c2


[FLINK-4245] JMXReporter exposes all defined variables

This closes #2418.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94d1b63c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94d1b63c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94d1b63c

Branch: refs/heads/master
Commit: 94d1b63c05ded616120e8283f584a7affe7fe744
Parents: 344fe94
Author: zentol <[email protected]>
Authored: Wed Jul 27 12:29:10 2016 +0200
Committer: zentol <[email protected]>
Committed: Mon Oct 31 13:15:04 2016 +0100

----------------------------------------------------------------------
 flink-metrics/flink-metrics-jmx/pom.xml         | 10 +--
 .../apache/flink/metrics/jmx/JMXReporter.java   | 62 +++++++++---------
 .../flink/metrics/jmx/JMXReporterTest.java      | 66 +++++++++++++-------
 .../jobmanager/JMXJobManagerMetricTest.java     |  7 ++-
 .../metrics/groups/AbstractMetricGroup.java     | 41 ++++++++++++
 .../metrics/groups/FrontMetricGroup.java        |  8 +++
 .../metrics/groups/GenericMetricGroup.java      |  5 ++
 .../metrics/groups/JobManagerMetricGroup.java   |  5 ++
 .../runtime/metrics/groups/JobMetricGroup.java  |  5 ++
 .../metrics/groups/OperatorMetricGroup.java     |  5 ++
 .../metrics/groups/TaskManagerMetricGroup.java  |  5 ++
 .../runtime/metrics/groups/TaskMetricGroup.java |  5 ++
 .../metrics/groups/AbstractMetricGroupTest.java |  5 ++
 .../runtime/metrics/groups/MetricGroupTest.java |  5 ++
 .../connectors/kafka/KafkaConsumerTestBase.java |  6 +-
 15 files changed, 181 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-metrics/flink-metrics-jmx/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml 
b/flink-metrics/flink-metrics-jmx/pom.xml
index 626f063..8ee983b 100644
--- a/flink-metrics/flink-metrics-jmx/pom.xml
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -49,20 +49,20 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-core</artifactId>
+                       <artifactId>flink-runtime_2.10</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
 
-               <!-- test dependencies -->
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
+                       <artifactId>flink-metrics-core</artifactId>
                        <version>${project.version}</version>
-                       <scope>test</scope>
+                       <scope>provided</scope>
                </dependency>
 
+               <!-- test dependencies -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime_2.10</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 39a5aa2..9c1fabb 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.metrics.jmx;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -25,8 +26,9 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
-
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +50,7 @@ import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -59,13 +62,19 @@ import java.util.Map;
  */
 public class JMXReporter implements MetricReporter {
 
-       private static final String PREFIX = "org.apache.flink.metrics:";
-       private static final String KEY_PREFIX = "key";
+       static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
 
        public static final String ARG_PORT = "port";
 
        private static final Logger LOG = 
LoggerFactory.getLogger(JMXReporter.class);
 
+       private static final CharacterFilter CHARACTER_FILTER = new 
CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return replaceInvalidChars(input);
+               }
+       };
+
        // 
------------------------------------------------------------------------
 
        /** The server where the management beans are registered and 
deregistered */
@@ -144,14 +153,19 @@ public class JMXReporter implements MetricReporter {
 
        @Override
        public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
-               final String name = generateJmxName(metricName, 
group.getScopeComponents());
+               final String domain = generateJmxDomain(metricName, group);
+               final Hashtable<String, String> table = 
generateJmxTable(group.getAllVariables());
 
                AbstractBean jmxMetric;
                ObjectName jmxName;
                try {
-                       jmxName = new ObjectName(name);
+                       jmxName = new ObjectName(domain, table);
                } catch (MalformedObjectNameException e) {
-                       LOG.error("Metric name did not conform to JMX 
ObjectName rules: " + name, e);
+                       /**
+                        * There is an implementation error on our side if this 
occurs. Either the domain was modified and no longer
+                        * conforms to the JMX domain rules or the table wasn't 
properly generated.
+                        */
+                       LOG.debug("Implementation error. The domain or table 
does not conform to JMX rules." , e);
                        return;
                }
 
@@ -176,12 +190,11 @@ public class JMXReporter implements MetricReporter {
                        }
                } catch (NotCompliantMBeanException e) {
                        // implementation error on our side
-                       LOG.error("Metric did not comply with JMX MBean naming 
rules.", e);
+                       LOG.debug("Metric did not comply with JMX MBean 
rules.", e);
                } catch (InstanceAlreadyExistsException e) {
-                       LOG.debug("A metric with the name " + jmxName + " was 
already registered.", e);
-                       LOG.error("A metric with the name " + jmxName + " was 
already registered.");
+                       LOG.warn("A metric with the name " + jmxName + " was 
already registered.", e);
                } catch (Throwable t) {
-                       LOG.error("Failed to register metric", t);
+                       LOG.warn("Failed to register metric", t);
                }
        }
 
@@ -209,27 +222,18 @@ public class JMXReporter implements MetricReporter {
        //  Utilities 
        // 
------------------------------------------------------------------------
 
-       static String generateJmxName(String metricName, String[] 
scopeComponents) {
-               final StringBuilder nameBuilder = new StringBuilder(128);
-               nameBuilder.append(PREFIX);
-
-               for (int x = 0; x < scopeComponents.length; x++) {
-                       // write keyX=
-                       nameBuilder.append(KEY_PREFIX);
-                       nameBuilder.append(x);
-                       nameBuilder.append("=");
-
-                       // write scope component
-                       
nameBuilder.append(replaceInvalidChars(scopeComponents[x]));
-                       nameBuilder.append(",");
+       static Hashtable<String, String> generateJmxTable(Map<String, String> 
variables) {
+               Hashtable<String, String> ht = new 
Hashtable<>(variables.size());
+               for (Map.Entry<String, String> variable : variables.entrySet()) 
{
+                       ht.put(replaceInvalidChars(variable.getKey()), 
replaceInvalidChars(variable.getValue()));
                }
+               return ht;
+       }
 
-               // write the name
-               
nameBuilder.append("name=").append(replaceInvalidChars(metricName));
-
-               return nameBuilder.toString();
+       static String generateJmxDomain(String metricName, MetricGroup group) {
+               return JMX_DOMAIN_PREFIX + 
((FrontMetricGroup<AbstractMetricGroup<?>>) 
group).getLogicalScope(CHARACTER_FILTER, '.') + '.' + metricName;
        }
-       
+
        /**
         * Lightweight method to replace unsupported characters.
         * If the string does not contain any unsupported characters, this 
method creates no
@@ -251,6 +255,8 @@ public class JMXReporter implements MetricReporter {
                for (int i = 0; i < strLen; i++) {
                        final char c = str.charAt(i);
                        switch (c) {
+                               case '>':
+                               case '<':
                                case '"':
                                        // remove character by not moving cursor
                                        if (chars == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 8968587..1a96287 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -40,8 +41,12 @@ import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
+import java.util.Map;
 
+import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -65,14 +70,20 @@ public class JMXReporterTest extends TestLogger {
        }
 
        /**
-        * Verifies that the JMXReporter properly generates the JMX name.
+        * Verifies that the JMXReporter properly generates the JMX table.
         */
        @Test
-       public void testGenerateName() {
-               String[] scope = {"value0", "value1", "\"value2 (test),=;:?'"};
-               String jmxName = JMXReporter.generateJmxName("TestMetric", 
scope);
+       public void testGenerateTable() {
+               Map<String, String> vars = new HashMap<>();
+               vars.put("key0", "value0");
+               vars.put("key1", "value1");
+               vars.put("\"key2,=;:?'", "\"value2 (test),=;:?'");
 
-               
assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric",
 jmxName);
+               Hashtable<String, String> jmxTable = 
JMXReporter.generateJmxTable(vars);
+
+               assertEquals("value0", jmxTable.get("key0"));
+               assertEquals("value1", jmxTable.get("key1"));
+               assertEquals("value2_(test)------", jmxTable.get("key2------"));
        }
 
        /**
@@ -102,28 +113,34 @@ public class JMXReporterTest extends TestLogger {
                MetricReporter rep1 = reporters.get(0);
                MetricReporter rep2 = reporters.get(1);
 
-               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+               Gauge<Integer> g1 = new Gauge<Integer>() {
                        @Override
                        public Integer getValue() {
                                return 1;
                        }
-               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+               };
+               Gauge<Integer> g2 = new Gauge<Integer>() {
                        @Override
                        public Integer getValue() {
                                return 2;
                        }
-               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+               };
+
+               rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, 
new TaskManagerMetricGroup(reg, "host", "tm")));
+               rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(0, 
new TaskManagerMetricGroup(reg, "host", "tm")));
 
                MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
 
-               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
-               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+               ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + 
"taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
+               ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + 
"taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
 
                assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
                assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
 
+               rep1.notifyOfRemovedMetric(g1, "rep1", null);
+               rep1.notifyOfRemovedMetric(g2, "rep2", null);
+               
+               mg.close();
                reg.shutdown();
        }
 
@@ -156,22 +173,25 @@ public class JMXReporterTest extends TestLogger {
                MetricReporter rep1 = reporters.get(0);
                MetricReporter rep2 = reporters.get(1);
 
-               rep1.notifyOfAddedMetric(new Gauge<Integer>() {
+               Gauge<Integer> g1 = new Gauge<Integer>() {
                        @Override
                        public Integer getValue() {
                                return 1;
                        }
-               }, "rep1", new TaskManagerMetricGroup(reg, "host", "tm"));
-
-               rep2.notifyOfAddedMetric(new Gauge<Integer>() {
+               };
+               Gauge<Integer> g2 = new Gauge<Integer>() {
                        @Override
                        public Integer getValue() {
                                return 2;
                        }
-               }, "rep2", new TaskManagerMetricGroup(reg, "host", "tm"));
+               };
 
-               ObjectName objectName1 = new 
ObjectName(JMXReporter.generateJmxName("rep1", mg.getScopeComponents()));
-               ObjectName objectName2 = new 
ObjectName(JMXReporter.generateJmxName("rep2", mg.getScopeComponents()));
+               rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(0, 
new TaskManagerMetricGroup(reg, "host", "tm")));
+
+               rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(1, 
new TaskManagerMetricGroup(reg, "host", "tm")));
+
+               ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + 
"taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
+               ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + 
"taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
 
                JMXServiceURL url1 = new 
JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + 
"/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
                JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
@@ -189,10 +209,14 @@ public class JMXReporterTest extends TestLogger {
                assertEquals(1, mCon2.getAttribute(objectName1, "Value"));
                assertEquals(2, mCon2.getAttribute(objectName2, "Value"));
 
+               rep1.notifyOfRemovedMetric(g1, "rep1", null);
+               rep1.notifyOfRemovedMetric(g2, "rep2", null);
+
                jmxCon2.close();
 
                rep1.close();
                rep2.close();
+               mg.close();
                reg.shutdown();
        }
 
@@ -219,7 +243,7 @@ public class JMXReporterTest extends TestLogger {
 
                        MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
 
-                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(histogramName, 
metricGroup.getScopeComponents()));
+                       ObjectName objectName = new 
ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + histogramName, 
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
 
                        MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
 
@@ -269,7 +293,7 @@ public class JMXReporterTest extends TestLogger {
 
                        MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
 
-                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(meterName, 
metricGroup.getScopeComponents()));
+                       ObjectName objectName = new 
ObjectName(JMX_DOMAIN_PREFIX + "taskmanager." + meterName, 
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
 
                        MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 0954241..3ae224f 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -38,6 +39,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -83,8 +85,9 @@ public class JMXJobManagerMetricTest {
                        Await.ready(jobRunning, deadline.timeLeft());
 
                        MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
-                       ObjectName objectName1 = new 
ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
-                       assertEquals(-1L, mBeanServer.getAttribute(objectName1, 
"Value"));
+                       Set<ObjectName> nameSet = mBeanServer.queryNames(new 
ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"),
 null);
+                       Assert.assertEquals(1, nameSet.size());
+                       assertEquals(-1L, 
mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
 
                        Future<Object> jobFinished = 
flink.getLeaderGateway(deadline.timeLeft())
                                .ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), 
deadline.timeLeft());

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 907c655..04b8158 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -89,6 +89,10 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
        private String scopeString;
 
+       /** The logical metrics scope represented by this group, as a 
concatenated string, lazily computed.
+        * For example: "taskmanager.job.task" */
+       private String logicalScopeString;
+
        /** The metrics query service scope represented by this group, lazily 
computed. */
        protected QueryScopeInfo queryServiceScopeInfo;
 
@@ -119,6 +123,43 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        }
 
        /**
+        * Returns the logical scope of this group, for example
+        * {@code "taskmanager.job.task"}
+        *
+        * @param filter character filter which is applied to the scope 
components
+        * @return logical scope
+        */
+       public String getLogicalScope(CharacterFilter filter) {
+               return getLogicalScope(filter, registry.getDelimiter());
+       }
+
+       /**
+        * Returns the logical scope of this group, for example
+        * {@code "taskmanager.job.task"}
+        *
+        * @param filter character filter which is applied to the scope 
components
+        * @return logical scope
+        */
+       public String getLogicalScope(CharacterFilter filter, char delimiter) {
+               if (logicalScopeString == null) {
+                       if (parent == null) {
+                               logicalScopeString = getGroupName(filter);
+                       } else {
+                               logicalScopeString = 
parent.getLogicalScope(filter, delimiter) + delimiter + getGroupName(filter);
+                       }
+               }
+               return logicalScopeString;
+       }
+
+       /**
+        * Returns the name for this group, meaning what kind of entity it 
represents, for example "taskmanager".
+        * 
+        * @param filter character filter which is applied to the name
+        * @return logical name for this group
+        */
+       protected abstract String getGroupName(CharacterFilter filter);
+
+       /**
         * Gets the scope as an array of the scope components, for example
         * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
index 885e6d6..63842fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
@@ -46,4 +46,12 @@ public class FrontMetricGroup<P extends 
AbstractMetricGroup<?>> extends ProxyMet
        public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
                return parentMetricGroup.getMetricIdentifier(metricName, 
filter, this.reporterIndex);
        }
+
+       public String getLogicalScope(CharacterFilter filter) {
+               return parentMetricGroup.getLogicalScope(filter);
+       }
+
+       public String getLogicalScope(CharacterFilter filter, char delimiter) {
+               return parentMetricGroup.getLogicalScope(filter, delimiter);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 54fbed4..5978f2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -56,4 +56,9 @@ public class GenericMetricGroup extends 
AbstractMetricGroup<AbstractMetricGroup<
                }
                return new String[] { name };
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return filter.filterCharacters(name);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 2f6b07a..5a35110 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -107,5 +107,10 @@ public class JobManagerMetricGroup extends 
ComponentMetricGroup<JobManagerMetric
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return jobs.values();
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return "jobmanager";
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index 091807f..17f6189 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -81,4 +81,9 @@ public abstract class JobMetricGroup<C extends 
ComponentMetricGroup<C>> extends
                variables.put(ScopeFormat.SCOPE_JOB_ID, jobId.toString());
                variables.put(ScopeFormat.SCOPE_JOB_NAME, jobName);
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return "job";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 0c823ea..37c9dd8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -83,4 +83,9 @@ public class OperatorMetricGroup extends 
ComponentMetricGroup<TaskMetricGroup> {
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return Collections.emptyList();
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return "operator";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index a8fa828..c3ca5fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -135,5 +135,10 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup<TaskManagerMetr
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return jobs.values();
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return "taskmanager";
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 75b8bd8..43e8e1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -169,4 +169,9 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
        protected Iterable<? extends ComponentMetricGroup> subComponents() {
                return operators.values();
        }
+
+       @Override
+       protected String getGroupName(CharacterFilter filter) {
+               return "task";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index ca3810a..c7b392f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -39,6 +39,11 @@ public class AbstractMetricGroupTest {
                        protected QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
                                return null;
                        }
+
+                       @Override
+                       protected String getGroupName(CharacterFilter filter) {
+                               return "";
+                       }
                };
                assertTrue(group.getAllVariables().isEmpty());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 8a1a006..665abb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -177,6 +177,11 @@ public class MetricGroupTest extends TestLogger {
                }
 
                @Override
+               protected String getGroupName(CharacterFilter filter) {
+                       return "";
+               }
+
+               @Override
                protected void addMetric(String name, Metric metric) {}
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/94d1b63c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7b06cfd..aa7ea49 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1504,13 +1504,13 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        // connect to JMX
                        MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
                        // wait until we've found all 5 offset metrics
-                       Set<ObjectName> offsetMetrics = 
mBeanServer.queryNames(new ObjectName("*:key7=current-offsets,*"), null);
+                       Set<ObjectName> offsetMetrics = 
mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
                        while (offsetMetrics.size() < 5) { // test will time 
out if metrics are not properly working
                                if (error.f0 != null) {
                                        // fail test early
                                        throw error.f0;
                                }
-                               offsetMetrics = mBeanServer.queryNames(new 
ObjectName("*:key7=current-offsets,*"), null);
+                               offsetMetrics = mBeanServer.queryNames(new 
ObjectName("*current-offsets*:*"), null);
                                Thread.sleep(50);
                        }
                        Assert.assertEquals(5, offsetMetrics.size());
@@ -1534,7 +1534,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        }
 
                        // check if producer metrics are also available.
-                       Set<ObjectName> producerMetrics = 
mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null);
+                       Set<ObjectName> producerMetrics = 
mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
                        Assert.assertTrue("No producer metrics found", 
producerMetrics.size() > 30);
 
 

Reply via email to