[FLINK-456] Basic JM Metric Infrastructure

This closes #2146


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a9fd11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a9fd11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a9fd11

Branch: refs/heads/master
Commit: a3a9fd1147aa926987420057f8305ab498519a45
Parents: a11c1c6
Author: zentol <[email protected]>
Authored: Fri Jul 1 14:12:44 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Jul 1 15:09:16 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/metrics/MetricRegistry.java    |  14 ++-
 .../groups/JobManagerJobMetricGroup.java        |  69 +++++++++++
 .../metrics/groups/JobManagerMetricGroup.java   | 104 ++++++++++++++++
 .../flink/metrics/groups/JobMetricGroup.java    |  99 ++-------------
 .../groups/TaskManagerJobMetricGroup.java       | 122 +++++++++++++++++++
 .../metrics/groups/TaskManagerMetricGroup.java  |  12 +-
 .../flink/metrics/groups/TaskMetricGroup.java   |   8 +-
 .../flink/metrics/groups/scope/ScopeFormat.java |  84 +++++++++++--
 .../metrics/groups/scope/ScopeFormats.java      |  27 +++-
 .../flink/metrics/MetricRegistryTest.java       |   4 +-
 .../flink/metrics/groups/JobGroupTest.java      |  94 --------------
 .../metrics/groups/JobManagerGroupTest.java     | 108 ++++++++++++++++
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ++++++++++++++
 .../flink/metrics/groups/OperatorGroupTest.java |   2 +-
 .../flink/metrics/groups/TaskGroupTest.java     |   6 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  41 ++++++-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 .../runtime/testingUtils/TestingCluster.scala   |   6 +-
 .../testingUtils/TestingJobManager.scala        |   7 +-
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   7 +-
 22 files changed, 778 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
index f283ce3..09beef6 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
@@ -52,8 +52,10 @@ public class MetricRegistry {
        public static final String KEY_METRICS_REPORTER_ARGUMENTS = 
"metrics.reporter.arguments";
        public static final String KEY_METRICS_REPORTER_INTERVAL = 
"metrics.reporter.interval";
 
+       public static final String KEY_METRICS_SCOPE_NAMING_JM = 
"metrics.scope.jm";
        public static final String KEY_METRICS_SCOPE_NAMING_TM = 
"metrics.scope.tm";
-       public static final String KEY_METRICS_SCOPE_NAMING_JOB = 
"metrics.scope.job";
+       public static final String KEY_METRICS_SCOPE_NAMING_JM_JOB = 
"metrics.scope.jm.job";
+       public static final String KEY_METRICS_SCOPE_NAMING_TM_JOB = 
"metrics.scope.tm.job";
        public static final String KEY_METRICS_SCOPE_NAMING_TASK = 
"metrics.scope.task";
        public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
 
@@ -243,16 +245,20 @@ public class MetricRegistry {
        }
 
        static ScopeFormats createScopeConfig(Configuration config) {
+               String jmFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
+               String jmJobFormat = config.getString(
+                       KEY_METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
                String tmFormat = config.getString(
                                KEY_METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               String jobFormat = config.getString(
-                               KEY_METRICS_SCOPE_NAMING_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+               String tmJobFormat = config.getString(
+                               KEY_METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
                String taskFormat = config.getString(
                                KEY_METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
                String operatorFormat = config.getString(
                                KEY_METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
                
-               return new ScopeFormats(tmFormat, jobFormat, taskFormat, 
operatorFormat);
+               return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
new file mode 100644
index 0000000..1dd0439
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job, running on the JobManager.
+ */
+@Internal
+public class JobManagerJobMetricGroup extends JobMetricGroup {
+
+       /** The metrics group that contains this group */
+       private final JobManagerMetricGroup parent;
+
+       public JobManagerJobMetricGroup(
+               MetricRegistry registry,
+               JobManagerMetricGroup parent,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
+       }
+
+       public JobManagerJobMetricGroup(
+               MetricRegistry registry,
+               JobManagerMetricGroup parent,
+               JobManagerJobScopeFormat scopeFormat,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
+
+               this.parent = checkNotNull(parent);
+       }
+
+       public final JobManagerMetricGroup parent() {
+               return parent;
+       }
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return Collections.emptyList();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
new file mode 100644
index 0000000..67e1117
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
JobManager.
+ *
+ * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do
+ * not contain tasks any more
+ */
+@Internal
+public class JobManagerMetricGroup extends ComponentMetricGroup {
+
+       private final Map<JobID, JobManagerJobMetricGroup> jobs = new 
HashMap<>();
+
+       private final String hostname;
+
+       public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
+               this(registry, 
registry.getScopeFormats().getJobManagerFormat(), hostname);
+       }
+
+       public JobManagerMetricGroup(
+               MetricRegistry registry,
+               JobManagerScopeFormat scopeFormat,
+               String hostname) {
+
+               super(registry, scopeFormat.formatScope(hostname));
+               this.hostname = hostname;
+       }
+
+       public String hostname() {
+               return hostname;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  job groups
+       // 
------------------------------------------------------------------------
+
+       public JobManagerJobMetricGroup addJob(
+               JobID jobId,
+               String jobName) {
+               // get or create a jobs metric group
+               JobManagerJobMetricGroup currentJobGroup;
+               synchronized (this) {
+                       if (!isClosed()) {
+                               currentJobGroup = jobs.get(jobId);
+
+                               if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
+                                       currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
+                                       jobs.put(jobId, currentJobGroup);
+                               }
+                               return currentJobGroup;
+                       } else {
+                               return null;
+                       }
+               }
+       }
+
+       public void removeJob(JobID jobId) {
+               if (jobId == null) {
+                       return;
+               }
+
+               synchronized (this) {
+                       JobManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
+                       if (containedGroup != null) {
+                               containedGroup.close();
+                       }
+               }
+       }
+
+       public int numRegisteredJobMetricGroups() {
+               return jobs.size();
+       }
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return jobs.values();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
index f816278..f7dfc78 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
@@ -21,66 +21,36 @@ package org.apache.flink.metrics.groups;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
- * a specific job, running on the TaskManager.
- * 
- * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
+ * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job.
  */
 @Internal
-public class JobMetricGroup extends ComponentMetricGroup {
-
-       /** The metrics group that contains this group */
-       private final TaskManagerMetricGroup parent;
-
-       /** Map from execution attempt ID (task identifier) to task metrics */
-       private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+public abstract class JobMetricGroup extends ComponentMetricGroup {
 
        /** The ID of the job represented by this metrics group */
-       private final JobID jobId;
+       protected final JobID jobId;
 
        /** The name of the job represented by this metrics group */
        @Nullable
-       private final String jobName;
+       protected final String jobName;
 
        // 
------------------------------------------------------------------------
 
-       public JobMetricGroup(
+       protected JobMetricGroup(
                        MetricRegistry registry,
-                       TaskManagerMetricGroup parent,
                        JobID jobId,
-                       @Nullable String jobName) {
+                       @Nullable String jobName,
+                       String[] scope) {
+               super(registry, scope);
                
-               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobFormat(), jobId, jobName);
-       }
-
-       public JobMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerMetricGroup parent,
-                       TaskManagerJobScopeFormat scopeFormat, 
-                       JobID jobId,
-                       @Nullable String jobName) {
-
-               super(registry, scopeFormat.formatScope(parent, jobId, 
jobName));
-
-               this.parent = checkNotNull(parent);
-               this.jobId = checkNotNull(jobId);
+               this.jobId = jobId;
                this.jobName = jobName;
        }
 
-       public final TaskManagerMetricGroup parent() {
-               return parent;
-       }
-
        public JobID jobId() {
                return jobId;
        }
@@ -89,53 +59,4 @@ public class JobMetricGroup extends ComponentMetricGroup {
        public String jobName() {
                return jobName;
        }
-
-       // 
------------------------------------------------------------------------
-       //  adding / removing tasks
-       // 
------------------------------------------------------------------------
-
-       public TaskMetricGroup addTask(
-                       AbstractID vertexId,
-                       AbstractID executionId,
-                       String taskName,
-                       int subtaskIndex,
-                       int attemptNumber) {
-               
-               checkNotNull(executionId);
-
-               synchronized (this) {
-                       if (!isClosed()) {
-                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this, 
-                                               vertexId, executionId, 
taskName, subtaskIndex, attemptNumber);
-                               tasks.put(executionId, task);
-                               return task;
-                       } else {
-                               return null;
-                       }
-               }
-       }
-
-       public void removeTaskMetricGroup(AbstractID executionId) {
-               checkNotNull(executionId);
-
-               boolean removeFromParent = false;
-               synchronized (this) {
-                       if (!isClosed() && tasks.remove(executionId) != null && 
tasks.isEmpty()) {
-                               // this call removed the last task. close this 
group.
-                               removeFromParent = true;
-                               close();
-                       }
-               }
-
-               // IMPORTANT: removing from the parent must happen while 
holding the this group's lock,
-               //      because it would violate the "first parent then 
subgroup" lock acquisition order
-               if (removeFromParent) {
-                       parent.removeJobMetricsGroup(jobId, this);
-               }
-       }
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return tasks.values();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
new file mode 100644
index 0000000..fdaf1de
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
+ * a specific job, running on the TaskManager.
+ *
+ * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
+ */
+@Internal
+public class TaskManagerJobMetricGroup extends JobMetricGroup {
+
+       /** The metrics group that contains this group */
+       private final TaskManagerMetricGroup parent;
+
+       /** Map from execution attempt ID (task identifier) to task metrics */
+       private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
+
+       // 
------------------------------------------------------------------------
+
+       public TaskManagerJobMetricGroup(
+               MetricRegistry registry,
+               TaskManagerMetricGroup parent,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
+       }
+
+       public TaskManagerJobMetricGroup(
+               MetricRegistry registry,
+               TaskManagerMetricGroup parent,
+               TaskManagerJobScopeFormat scopeFormat,
+               JobID jobId,
+               @Nullable String jobName) {
+
+               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
+
+               this.parent = checkNotNull(parent);
+       }
+
+       public final TaskManagerMetricGroup parent() {
+               return parent;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  adding / removing tasks
+       // 
------------------------------------------------------------------------
+
+       public TaskMetricGroup addTask(
+               AbstractID vertexId,
+               AbstractID executionId,
+               String taskName,
+               int subtaskIndex,
+               int attemptNumber) {
+
+               checkNotNull(executionId);
+
+               synchronized (this) {
+                       if (!isClosed()) {
+                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this,
+                                       vertexId, executionId, taskName, 
subtaskIndex, attemptNumber);
+                               tasks.put(executionId, task);
+                               return task;
+                       } else {
+                               return null;
+                       }
+               }
+       }
+
+       public void removeTaskMetricGroup(AbstractID executionId) {
+               checkNotNull(executionId);
+
+               boolean removeFromParent = false;
+               synchronized (this) {
+                       if (!isClosed() && tasks.remove(executionId) != null && 
tasks.isEmpty()) {
+                               // this call removed the last task. close this 
group.
+                               removeFromParent = true;
+                               close();
+                       }
+               }
+
+               // IMPORTANT: removing from the parent must not happen while 
holding the this group's lock,
+               //      because it would violate the "first parent then 
subgroup" lock acquisition order
+               if (removeFromParent) {
+                       parent.removeJobMetricsGroup(jobId, this);
+               }
+       }
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return tasks.values();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
index 3cb3936..2b2b201 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
@@ -36,7 +36,7 @@ import java.util.Map;
 @Internal
 public class TaskManagerMetricGroup extends ComponentMetricGroup {
 
-       private final Map<JobID, JobMetricGroup> jobs = new HashMap<>();
+       private final Map<JobID, TaskManagerJobMetricGroup> jobs = new 
HashMap<>();
 
        private final String hostname;
 
@@ -82,12 +82,12 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
                // because it might lead to a deadlock
                while (true) {
                        // get or create a jobs metric group
-                       JobMetricGroup currentJobGroup;
+                       TaskManagerJobMetricGroup currentJobGroup;
                        synchronized (this) {
                                currentJobGroup = jobs.get(jobId);
-                               
+
                                if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
-                                       currentJobGroup = new 
JobMetricGroup(registry, this, jobId, jobName);
+                                       currentJobGroup = new 
TaskManagerJobMetricGroup(registry, this, jobId, jobName);
                                        jobs.put(jobId, currentJobGroup);
                                }
                        }
@@ -106,14 +106,14 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup {
                }
        }
 
-       public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) {
+       public void removeJobMetricsGroup(JobID jobId, 
TaskManagerJobMetricGroup group) {
                if (jobId == null || group == null || !group.isClosed()) {
                        return;
                }
 
                synchronized (this) {
                        // optimistically remove the currently contained group, 
and check later if it was correct
-                       JobMetricGroup containedGroup = jobs.remove(jobId);
+                       TaskManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
 
                        // check if another group was actually contained, and 
restore that one
                        if (containedGroup != null && containedGroup != group) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
index 784578b..c0428ac 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
@@ -38,7 +38,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskMetricGroup extends ComponentMetricGroup {
 
        /** The job metrics group containing this task metrics group */
-       private final JobMetricGroup parent;
+       private final TaskManagerJobMetricGroup parent;
 
        private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
 
@@ -61,7 +61,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 
        public TaskMetricGroup(
                        MetricRegistry registry,
-                       JobMetricGroup parent,
+                       TaskManagerJobMetricGroup parent,
                        @Nullable AbstractID vertexId,
                        AbstractID executionId,
                        @Nullable String taskName,
@@ -74,7 +74,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
 
        public TaskMetricGroup(
                        MetricRegistry registry,
-                       JobMetricGroup parent,
+                       TaskManagerJobMetricGroup parent,
                        TaskScopeFormat scopeFormat, 
                        @Nullable AbstractID vertexId,
                        AbstractID executionId,
@@ -99,7 +99,7 @@ public class TaskMetricGroup extends ComponentMetricGroup {
        //  properties
        // 
------------------------------------------------------------------------
 
-       public final JobMetricGroup parent() {
+       public final TaskManagerJobMetricGroup parent() {
                return parent;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
index 9637f65..b73cf51 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java
@@ -19,7 +19,9 @@
 package org.apache.flink.metrics.groups.scope;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.util.AbstractID;
@@ -66,29 +68,47 @@ public abstract class ScopeFormat {
        //  Scope Variables
        // 
------------------------------------------------------------------------
 
+       public static final String SCOPE_ACTOR_HOST = asVariable("host");
+
+       // ----- Job Manager ----
+
+       /** The default scope format of the JobManager component: {@code 
"<host>.jobmanager"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
+               concat(SCOPE_ACTOR_HOST, "jobmanager");
+
+       /** The default scope format of JobManager metrics: {@code 
"<host>.jobmanager"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = 
DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
+
        // ----- Task Manager ----
 
-       public static final String SCOPE_TASKMANAGER_HOST = asVariable("host");
        public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
 
        /** The default scope format of the TaskManager component: {@code 
"<host>.taskmanager.<tm_id>"} */
        public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-                       concat(SCOPE_TASKMANAGER_HOST, "taskmanager", 
SCOPE_TASKMANAGER_ID);
+                       concat(SCOPE_ACTOR_HOST, "taskmanager", 
SCOPE_TASKMANAGER_ID);
 
        /** The default scope format of TaskManager metrics: {@code 
"<host>.taskmanager.<tm_id>"} */
        public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = 
DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
 
-       // ----- Job on Task Manager ----
+       // ----- Job -----
 
        public static final String SCOPE_JOB_ID = asVariable("job_id");
        public static final String SCOPE_JOB_NAME = asVariable("job_name");
 
        /** The default scope format for the job component: {@code 
"<job_name>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = 
SCOPE_JOB_NAME;
+       public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
+
+       // ----- Job on Job Manager ----
+
+       /** The default scope format for all job metrics on a jobmanager: 
{@code "<host>.jobmanager.<job_name>"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
+               concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
+
+       // ----- Job on Task Manager ----
 
-       /** The default scope format for all job metrics: {@code 
"<host>.taskmanager.<tm_id>.<job_name>"} */
+       /** The default scope format for all job metrics on a taskmanager: 
{@code "<host>.taskmanager.<tm_id>.<job_name>"} */
        public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT);
+                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
 
        // ----- Task ----
 
@@ -125,13 +145,31 @@ public abstract class ScopeFormat {
        // 
------------------------------------------------------------------------
 
        /**
+        * The scope format for the {@link JobManagerMetricGroup}.
+        */
+       public static class JobManagerScopeFormat extends ScopeFormat {
+
+               public JobManagerScopeFormat(String format) {
+                       super(format, null, new String[] {
+                               SCOPE_ACTOR_HOST
+                       });
+               }
+
+               public String[] formatScope(String hostname) {
+                       final String[] template = copyTemplate();
+                       final String[] values = { hostname };
+                       return bindVariables(template, values);
+               }
+       }
+
+       /**
         * The scope format for the {@link TaskManagerMetricGroup}.
         */
        public static class TaskManagerScopeFormat extends ScopeFormat {
 
                public TaskManagerScopeFormat(String format) {
                        super(format, null, new String[] {
-                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_ACTOR_HOST,
                                        SCOPE_TASKMANAGER_ID
                        });
                }
@@ -148,11 +186,35 @@ public abstract class ScopeFormat {
        /**
         * The scope format for the {@link JobMetricGroup}.
         */
+       public static class JobManagerJobScopeFormat extends ScopeFormat {
+
+               public JobManagerJobScopeFormat(String format, 
JobManagerScopeFormat parentFormat) {
+                       super(format, parentFormat, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_JOB_ID,
+                               SCOPE_JOB_NAME
+                       });
+               }
+
+               public String[] formatScope(JobManagerMetricGroup parent, JobID 
jid, String jobName) {
+                       final String[] template = copyTemplate();
+                       final String[] values = {
+                               parent.hostname(),
+                               valueOrNull(jid),
+                               valueOrNull(jobName)
+                       };
+                       return bindVariables(template, values);
+               }
+       }
+
+       /**
+        * The scope format for the {@link JobMetricGroup}.
+        */
        public static class TaskManagerJobScopeFormat extends ScopeFormat {
 
                public TaskManagerJobScopeFormat(String format, 
TaskManagerScopeFormat parentFormat) {
                        super(format, parentFormat, new String[] {
-                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_ACTOR_HOST,
                                        SCOPE_TASKMANAGER_ID,
                                        SCOPE_JOB_ID,
                                        SCOPE_JOB_NAME
@@ -180,7 +242,7 @@ public abstract class ScopeFormat {
 
                public TaskScopeFormat(String format, TaskManagerJobScopeFormat 
parentFormat) {
                        super(format, parentFormat, new String[] {
-                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_ACTOR_HOST,
                                        SCOPE_TASKMANAGER_ID,
                                        SCOPE_JOB_ID,
                                        SCOPE_JOB_NAME,
@@ -193,7 +255,7 @@ public abstract class ScopeFormat {
                }
 
                public String[] formatScope(
-                               JobMetricGroup parent,
+                               TaskManagerJobMetricGroup parent,
                                AbstractID vertexId, AbstractID attemptId,
                                String taskName, int subtask, int 
attemptNumber) {
 
@@ -222,7 +284,7 @@ public abstract class ScopeFormat {
 
                public OperatorScopeFormat(String format, TaskScopeFormat 
parentFormat) {
                        super(format, parentFormat, new String[] {
-                                       SCOPE_TASKMANAGER_HOST,
+                                       SCOPE_ACTOR_HOST,
                                        SCOPE_TASKMANAGER_ID,
                                        SCOPE_JOB_ID,
                                        SCOPE_JOB_NAME,

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
index 1451637..978e761 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java
@@ -19,6 +19,8 @@
 package org.apache.flink.metrics.groups.scope;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
 import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
 import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
 import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
@@ -32,6 +34,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class ScopeFormats {
 
+       private final JobManagerScopeFormat jobManagerFormat;
+       private final JobManagerJobScopeFormat jobManagerJobFormat;
        private final TaskManagerScopeFormat taskManagerFormat;
        private final TaskManagerJobScopeFormat taskManagerJobFormat;
        private final TaskScopeFormat taskFormat;
@@ -43,6 +47,11 @@ public class ScopeFormats {
         * Creates all default scope formats.
         */
        public ScopeFormats() {
+               this.jobManagerFormat = new 
JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+
+               this.jobManagerJobFormat = new JobManagerJobScopeFormat(
+                       ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, 
this.jobManagerFormat);
+
                this.taskManagerFormat = new 
TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
 
                this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
@@ -59,11 +68,15 @@ public class ScopeFormats {
         * Creates all scope formats, based on the given scope format strings.
         */
        public ScopeFormats(
+                       String jobManagerFormat,
+                       String jobManagerJobFormat,
                        String taskManagerFormat,
                        String taskManagerJobFormat,
                        String taskFormat,
                        String operatorFormat)
        {
+               this.jobManagerFormat = new 
JobManagerScopeFormat(jobManagerFormat);
+               this.jobManagerJobFormat = new 
JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
                this.taskManagerFormat = new 
TaskManagerScopeFormat(taskManagerFormat);
                this.taskManagerJobFormat = new 
TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
                this.taskFormat = new TaskScopeFormat(taskFormat, 
this.taskManagerJobFormat);
@@ -74,11 +87,15 @@ public class ScopeFormats {
         * Creates a {@code ScopeFormats} with the given scope formats.
         */
        public ScopeFormats(
+                       JobManagerScopeFormat jobManagerFormat,
+                       JobManagerJobScopeFormat jobManagerJobFormat,
                        TaskManagerScopeFormat taskManagerFormat,
                        TaskManagerJobScopeFormat taskManagerJobFormat,
                        TaskScopeFormat taskFormat,
                        OperatorScopeFormat operatorFormat)
        {
+               this.jobManagerFormat = checkNotNull(jobManagerFormat);
+               this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
                this.taskManagerFormat = checkNotNull(taskManagerFormat);
                this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
                this.taskFormat = checkNotNull(taskFormat);
@@ -87,14 +104,22 @@ public class ScopeFormats {
 
        // 
------------------------------------------------------------------------
 
+       public JobManagerScopeFormat getJobManagerFormat() {
+               return this.jobManagerFormat;
+       }
+
        public TaskManagerScopeFormat getTaskManagerFormat() {
                return this.taskManagerFormat;
        }
 
-       public TaskManagerJobScopeFormat getJobFormat() {
+       public TaskManagerJobScopeFormat getTaskManagerJobFormat() {
                return this.taskManagerJobFormat;
        }
 
+       public JobManagerJobScopeFormat getJobManagerJobFormat() {
+               return this.jobManagerJobFormat;
+       }
+
        public TaskScopeFormat getTaskFormat() {
                return this.taskFormat;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
index 8b71816..77acd3c 100644
--- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java
@@ -164,14 +164,14 @@ public class MetricRegistryTest extends TestLogger {
                Configuration config = new Configuration();
 
                config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, 
"A");
-               config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, 
"B");
+               
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM_JOB, "B");
                config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, 
"C");
                
config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D");
 
                ScopeFormats scopeConfig = 
MetricRegistry.createScopeConfig(config);
 
                assertEquals("A", scopeConfig.getTaskManagerFormat().format());
-               assertEquals("B", scopeConfig.getJobFormat().format());
+               assertEquals("B", 
scopeConfig.getTaskManagerJobFormat().format());
                assertEquals("C", scopeConfig.getTaskFormat().format());
                assertEquals("D", scopeConfig.getOperatorFormat().format());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
deleted file mode 100644
index 4bcb1ee..0000000
--- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class JobGroupTest {
-
-       @Test
-       public void testGenerateScopeDefault() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName"},
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "theHostName.taskmanager.test-tm-id.myJobName",
-                               jmGroup.getScopeString());
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustom() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
-
-               JobID jid = new JobID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "some-constant", "myJobName" },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "some-constant.myJobName",
-                               jmGroup.getScopeString());
-               registry.shutdown();
-       }
-
-       @Test
-       public void testGenerateScopeCustomWildcard() {
-               MetricRegistry registry = new MetricRegistry(new 
Configuration());
-
-               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
-               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
-
-               JobID jid = new JobID();
-
-               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jmFormat, jid, "myJobName");
-
-               assertArrayEquals(
-                               new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },
-                               jmGroup.getScopeComponents());
-
-               assertEquals(
-                               "peter.test-tm-id.some-constant." + jid,
-                               jmGroup.getScopeString());
-               registry.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
new file mode 100644
index 0000000..8853f20
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JobManagerGroupTest {
+
+       // 
------------------------------------------------------------------------
+       //  adding and removing jobs
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void addAndRemoveJobs() {
+               final JobManagerMetricGroup group = new JobManagerMetricGroup(
+                       new MetricRegistry(new Configuration()), "localhost");
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, 
jobName1);
+               JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, 
jobName1);
+               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, 
jobName2);
+
+               assertEquals(jmJobGroup11, jmJobGroup12);
+
+               assertEquals(2, group.numRegisteredJobMetricGroups());
+
+               group.removeJob(jid1);
+
+               assertTrue(jmJobGroup11.isClosed());
+               assertEquals(1, group.numRegisteredJobMetricGroups());
+
+               group.removeJob(jid2);
+
+               assertTrue(jmJobGroup21.isClosed());
+               assertEquals(0, group.numRegisteredJobMetricGroups());
+       }
+
+       @Test
+       public void testCloseClosesAll() {
+               final JobManagerMetricGroup group = new JobManagerMetricGroup(
+                       new MetricRegistry(new Configuration()), "localhost");
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, 
jobName1);
+               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, 
jobName2);
+
+               group.close();
+
+               assertTrue(jmJobGroup11.isClosed());
+               assertTrue(jmJobGroup21.isClosed());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  scope name tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
+
+               assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
+               assertEquals("localhost.jobmanager", group.getScopeString());
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerScopeFormat format = new 
JobManagerScopeFormat("constant.<host>.foo.<host>");
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, format, "host");
+
+               assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
+               assertEquals("constant.host.foo.host", group.getScopeString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
new file mode 100644
index 0000000..3833cb8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
+import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerJobGroupTest {
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, new JobID(), "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "jobmanager", 
"myJobName"},
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "theHostName.jobmanager.myJobName",
+                               jmGroup.getScopeString());
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("abc");
+               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+               JobID jid = new JobID();
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "some-constant", "myJobName" },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "some-constant.myJobName",
+                               jmGroup.getScopeString());
+       }
+
+       @Test
+       public void testGenerateScopeCustomWildcard() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("peter");
+               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+               JobID jid = new JobID();
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, tmFormat, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "peter", "some-constant", 
jid.toString() },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "peter.some-constant." + jid,
+                               jmGroup.getScopeString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
index c0c8842..9641632 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java
@@ -35,7 +35,7 @@ public class OperatorGroupTest {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(
                                registry, jmGroup,  new AbstractID(),  new 
AbstractID(), "aTaskName", 11, 0);
                OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, 
taskGroup, "myOpName");

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
index 88f425b..357852a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java
@@ -60,7 +60,7 @@ public class TaskGroupTest {
                AbstractID executionId = new AbstractID();
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(registry, 
jmGroup, vertexId, executionId, "aTaskName", 13, 2);
 
                assertArrayEquals(
@@ -86,7 +86,7 @@ public class TaskGroupTest {
                AbstractID executionId = new AbstractID();
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
jid, "myJobName");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(
                                registry, jmGroup, taskFormat, vertexId, 
executionId, "aTaskName", 13, 2);
 
@@ -114,7 +114,7 @@ public class TaskGroupTest {
                AbstractID executionId = new AbstractID();
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
-               JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, 
new JobID(), "myJobName");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
 
                TaskMetricGroup taskGroup = new TaskMetricGroup(
                                registry, jmGroup, format, new AbstractID(), 
executionId, "aTaskName", 13, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
new file mode 100644
index 0000000..5cec70b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricRegistry;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
+import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TaskManagerJobGroupTest {
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName"},
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "theHostName.taskmanager.test-tm-id.myJobName",
+                               jmGroup.getScopeString());
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("abc");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+               JobID jid = new JobID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "some-constant", "myJobName" },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "some-constant.myJobName",
+                               jmGroup.getScopeString());
+               registry.shutdown();
+       }
+
+       @Test
+       public void testGenerateScopeCustomWildcard() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerScopeFormat tmFormat = new 
TaskManagerScopeFormat("peter.<tm_id>");
+               TaskManagerJobScopeFormat jmFormat = new 
TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+               JobID jid = new JobID();
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id");
+               JobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "peter", "test-tm-id", 
"some-constant", jid.toString() },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "peter.test-tm-id.some-constant." + jid,
+                               jmGroup.getScopeString());
+               registry.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 60f0a97..be1caa5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
+import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
@@ -124,7 +126,8 @@ class JobManager(
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
     protected val savepointStore: SavepointStore,
-    protected val jobRecoveryTimeout: FiniteDuration)
+    protected val jobRecoveryTimeout: FiniteDuration,
+    protected val metricsRegistry: Option[FlinkMetricRegistry])
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -149,6 +152,16 @@ class JobManager(
 
   var leaderSessionID: Option[UUID] = None
 
+  protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = 
metricsRegistry match {
+    case Some(registry) =>
+      val host = 
flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+      Option(new JobManagerMetricGroup(
+        registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host))))
+    case None =>
+      log.warn("Could not instantiate JobManager metrics.")
+      None
+  }
+
   /** Futures which have to be completed before terminating the job manager */
   var futuresToComplete: Option[Seq[Future[Unit]]] = None
 
@@ -269,6 +282,13 @@ class JobManager(
     // shut down the extra thread pool for futures
     executorService.shutdown()
 
+    // failsafe shutdown of the metrics registry
+    try {
+      metricsRegistry.map(_.shutdown())
+    } catch {
+      case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
+    }
+
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -2266,7 +2286,8 @@ object JobManager {
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
     SavepointStore,
-    FiniteDuration // timeout for job recovery
+    FiniteDuration, // timeout for job recovery
+    Option[FlinkMetricRegistry]
    ) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -2358,6 +2379,13 @@ object JobManager {
       }
     }
 
+    val metricRegistry = try {
+      Option(new FlinkMetricRegistry(configuration))
+    } catch {
+      case _: Exception =>
+        None
+    }
+
     (executorService,
       instanceManager,
       scheduler,
@@ -2369,7 +2397,8 @@ object JobManager {
       submittedJobGraphs,
       checkpointRecoveryFactory,
       savepointStore,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricRegistry)
   }
 
   /**
@@ -2432,7 +2461,8 @@ object JobManager {
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = createJobManagerComponents(
+    jobRecoveryTimeout, 
+    metricsRegistry) = createJobManagerComponents(
       configuration,
       None)
 
@@ -2458,7 +2488,8 @@ object JobManager {
       submittedJobGraphs,
       checkpointRecoveryFactory,
       savepointStore,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricsRegistry)
 
     val jobManager: ActorRef = jobManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 6d3f768..3d8f298 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
 import org.apache.flink.metrics.groups.IOMetricGroup;
-import org.apache.flink.metrics.groups.JobMetricGroup;
+import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -60,7 +60,7 @@ public class UnregisteredTaskMetricsGroup extends 
TaskMetricGroup {
                }
        }
 
-       private static class DummyJobMetricGroup extends JobMetricGroup {
+       private static class DummyJobMetricGroup extends 
TaskManagerJobMetricGroup {
                
                public DummyJobMetricGroup() {
                        super(EMPTY_REGISTRY, new 
DummyTaskManagerMetricsGroup(), new JobID(), "testjob");

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 763bd36..b4ba40b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -107,7 +107,8 @@ class TestingCluster(
     submittedJobsGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = JobManager.createJobManagerComponents(
+    jobRecoveryTimeout,
+    metricRegistry) = JobManager.createJobManagerComponents(
       config,
       createLeaderElectionService())
 
@@ -128,7 +129,8 @@ class TestingCluster(
         submittedJobsGraphs,
         checkpointRecoveryFactory,
         savepointStore,
-        jobRecoveryTimeout))
+        jobRecoveryTimeout,
+        metricRegistry))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e854b13..04689c6 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -50,7 +51,8 @@ class TestingJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     savepointStore : SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
+    jobRecoveryTimeout : FiniteDuration,
+    metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
       executorService,
@@ -64,5 +66,6 @@ class TestingJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout)
+    jobRecoveryTimeout,
+    metricRegistry)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b67e319..2f43d38 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -358,7 +358,8 @@ object TestingUtils {
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) = JobManager.createJobManagerComponents(
+    jobRecoveryTimeout,
+    metricsRegistry) = JobManager.createJobManagerComponents(
       configuration,
       None
     )
@@ -380,7 +381,8 @@ object TestingUtils {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      jobRecoveryTimeout)
+      jobRecoveryTimeout,
+      metricsRegistry)
 
     val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, 
JobManager.JOB_MANAGER_NAME)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 23b3adc..3df1adc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -24,6 +24,7 @@ import akka.actor.ActorRef
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
+import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -70,7 +71,8 @@ class YarnJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     savepointStore: SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
+    jobRecoveryTimeout: FiniteDuration,
+    metricsRegistry: Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
     executorService,
@@ -84,7 +86,8 @@ class YarnJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     savepointStore,
-    jobRecoveryTimeout) {
+    jobRecoveryTimeout,
+    metricsRegistry) {
 
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =

Reply via email to