http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
new file mode 100644
index 0000000..8e23d2f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.TaskScopeFormat;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink 
runtime Task.
+ * 
+ * <p>Contains extra logic for adding operators.
+ */
+public class TaskMetricGroup extends ComponentMetricGroup {
+
+       /** The job metrics group containing this task metrics group */
+       private final TaskManagerJobMetricGroup parent;
+
+       private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
+
+       private final IOMetricGroup ioMetrics;
+       
+       /** The execution Id uniquely identifying the executed task represented 
by this metrics group */
+       private final AbstractID executionId;
+
+       @Nullable
+       private final AbstractID vertexId;
+       
+       @Nullable
+       private final String taskName;
+
+       private final int subtaskIndex;
+
+       private final int attemptNumber;
+
+       // 
------------------------------------------------------------------------
+
+       public TaskMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerJobMetricGroup parent,
+                       @Nullable AbstractID vertexId,
+                       AbstractID executionId,
+                       @Nullable String taskName,
+                       int subtaskIndex,
+                       int attemptNumber) {
+               
+               this(registry, parent, 
registry.getScopeFormats().getTaskFormat(),
+                               vertexId, executionId, taskName, subtaskIndex, 
attemptNumber);
+       }
+
+       public TaskMetricGroup(
+                       MetricRegistry registry,
+                       TaskManagerJobMetricGroup parent,
+                       TaskScopeFormat scopeFormat, 
+                       @Nullable AbstractID vertexId,
+                       AbstractID executionId,
+                       @Nullable String taskName,
+                       int subtaskIndex,
+                       int attemptNumber) {
+
+               super(registry, scopeFormat.formatScope(
+                               parent, vertexId, executionId, taskName, 
subtaskIndex, attemptNumber));
+
+               this.parent = checkNotNull(parent);
+               this.executionId = checkNotNull(executionId);
+               this.vertexId = vertexId;
+               this.taskName = taskName;
+               this.subtaskIndex = subtaskIndex;
+               this.attemptNumber = attemptNumber;
+
+               this.ioMetrics = new IOMetricGroup(this);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  properties
+       // 
------------------------------------------------------------------------
+
+       public final TaskManagerJobMetricGroup parent() {
+               return parent;
+       }
+
+       public AbstractID executionId() {
+               return executionId;
+       }
+
+       @Nullable
+       public AbstractID vertexId() {
+               return vertexId;
+       }
+
+       @Nullable
+       public String taskName() {
+               return taskName;
+       }
+
+       public int subtaskIndex() {
+               return subtaskIndex;
+       }
+
+       public int attemptNumber() {
+               return attemptNumber;
+       }
+
+       /**
+        * Returns the IOMetricGroup for this task.
+        *
+        * @return IOMetricGroup for this task.
+        */
+       public IOMetricGroup getIOMetricGroup() {
+               return ioMetrics;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  operators and cleanup
+       // 
------------------------------------------------------------------------
+
+       public OperatorMetricGroup addOperator(String name) {
+               OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, name);
+
+               synchronized (this) {
+                       OperatorMetricGroup previous = operators.put(name, 
operator);
+                       if (previous == null) {
+                               // no operator group so far
+                               return operator;
+                       } else {
+                               // already had an operator group. restore that 
one.
+                               operators.put(name, previous);
+                               return previous;
+                       }
+               }
+       }
+
+       @Override
+       public void close() {
+               super.close();
+
+               parent.removeTaskMetricGroup(executionId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Component Metric Group Specifics
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected Iterable<? extends ComponentMetricGroup> subComponents() {
+               return operators.values();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java
new file mode 100644
index 0000000..2d5356a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.JobMetricGroup}.
+ */
+public class JobManagerJobScopeFormat extends ScopeFormat {
+
+       public JobManagerJobScopeFormat(String format, JobManagerScopeFormat 
parentFormat) {
+               super(format, parentFormat, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_JOB_ID,
+                               SCOPE_JOB_NAME
+               });
+       }
+
+       public String[] formatScope(JobManagerMetricGroup parent, JobID jid, 
String jobName) {
+               final String[] template = copyTemplate();
+               final String[] values = {
+                               parent.hostname(),
+                               valueOrNull(jid),
+                               valueOrNull(jobName)
+               };
+               return bindVariables(template, values);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java
new file mode 100644
index 0000000..14f1b72
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup}.
+ */
+public class JobManagerScopeFormat extends ScopeFormat {
+
+       public JobManagerScopeFormat(String format) {
+               super(format, null, new String[] {
+                       SCOPE_ACTOR_HOST
+               });
+       }
+
+       public String[] formatScope(String hostname) {
+               final String[] template = copyTemplate();
+               final String[] values = { hostname };
+               return bindVariables(template, values);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java
new file mode 100644
index 0000000..fcebe37
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup}.
+ */
+public class OperatorScopeFormat extends ScopeFormat {
+
+       public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) 
{
+               super(format, parentFormat, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_TASKMANAGER_ID,
+                               SCOPE_JOB_ID,
+                               SCOPE_JOB_NAME,
+                               SCOPE_TASK_VERTEX_ID,
+                               SCOPE_TASK_ATTEMPT_ID,
+                               SCOPE_TASK_NAME,
+                               SCOPE_TASK_SUBTASK_INDEX,
+                               SCOPE_TASK_ATTEMPT_NUM,
+                               SCOPE_OPERATOR_NAME
+               });
+       }
+
+       public String[] formatScope(TaskMetricGroup parent, String 
operatorName) {
+
+               final String[] template = copyTemplate();
+               final String[] values = {
+                               parent.parent().parent().hostname(),
+                               parent.parent().parent().taskManagerId(),
+                               valueOrNull(parent.parent().jobId()),
+                               valueOrNull(parent.parent().jobName()),
+                               valueOrNull(parent.vertexId()),
+                               valueOrNull(parent.executionId()),
+                               valueOrNull(parent.taskName()),
+                               String.valueOf(parent.subtaskIndex()),
+                               String.valueOf(parent.attemptNumber()),
+                               valueOrNull(operatorName)
+               };
+               return bindVariables(template, values);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
new file mode 100644
index 0000000..a45df41
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.metrics.CharacterFilter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents the format after which the "scope" (or namespace) of 
the various
+ * component metric groups is built. Component metric groups are for example
+ * "TaskManager", "Task", or "Operator".
+ *
+ * <p>User defined scope formats allow users to include or exclude
+ * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
+ * group could for example include the task attempt number (more fine grained 
identification), or
+ * exclude it (continuity of the namespace across failure and recovery).
+ */
+public abstract class ScopeFormat {
+
+       private static CharacterFilter defaultFilter = new CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return input;
+               }
+       };
+
+       // 
------------------------------------------------------------------------
+       //  Scope Format Special Characters
+       // 
------------------------------------------------------------------------
+
+       /**
+        * If the scope format starts with this character, then the parent 
components scope
+        * format will be used as a prefix.
+        * 
+        * <p>For example, if the TaskManager's job format is {@code 
"*.<job_name>"}, and the
+        * TaskManager format is {@code "<host>"}, then the job's metrics
+        * will have {@code "<host>.<job_name>"} as their scope.
+        */
+       public static final String SCOPE_INHERIT_PARENT = "*";
+
+       public static final String SCOPE_SEPARATOR = ".";
+
+       private static final String SCOPE_VARIABLE_PREFIX = "<";
+       private static final String SCOPE_VARIABLE_SUFFIX = ">";
+
+       // 
------------------------------------------------------------------------
+       //  Scope Variables
+       // 
------------------------------------------------------------------------
+
+       public static final String SCOPE_ACTOR_HOST = asVariable("host");
+
+       // ----- Job Manager ----
+
+       /** The default scope format of the JobManager component: {@code 
"<host>.jobmanager"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
+               concat(SCOPE_ACTOR_HOST, "jobmanager");
+
+       /** The default scope format of JobManager metrics: {@code 
"<host>.jobmanager"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = 
DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
+
+       // ----- Task Manager ----
+
+       public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
+
+       /** The default scope format of the TaskManager component: {@code 
"<host>.taskmanager.<tm_id>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
+                       concat(SCOPE_ACTOR_HOST, "taskmanager", 
SCOPE_TASKMANAGER_ID);
+
+       /** The default scope format of TaskManager metrics: {@code 
"<host>.taskmanager.<tm_id>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = 
DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
+
+       // ----- Job -----
+
+       public static final String SCOPE_JOB_ID = asVariable("job_id");
+       public static final String SCOPE_JOB_NAME = asVariable("job_name");
+
+       /** The default scope format for the job component: {@code 
"<job_name>"} */
+       public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
+
+       // ----- Job on Job Manager ----
+
+       /** The default scope format for all job metrics on a jobmanager: 
{@code "<host>.jobmanager.<job_name>"} */
+       public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
+               concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
+
+       // ----- Job on Task Manager ----
+
+       /** The default scope format for all job metrics on a taskmanager: 
{@code "<host>.taskmanager.<tm_id>.<job_name>"} */
+       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
+
+       // ----- Task ----
+
+       public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
+       public static final String SCOPE_TASK_NAME = asVariable("task_name");
+       public static final String SCOPE_TASK_ATTEMPT_ID = 
asVariable("task_attempt_id");
+       public static final String SCOPE_TASK_ATTEMPT_NUM = 
asVariable("task_attempt_num");
+       public static final String SCOPE_TASK_SUBTASK_INDEX = 
asVariable("subtask_index");
+
+       /** Default scope of the task component: {@code 
"<task_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_TASK_COMPONENT =
+                       concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+       /** The default scope format for all task metrics:
+        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_TASK_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_TASK_COMPONENT);
+
+       // ----- Operator ----
+
+       public static final String SCOPE_OPERATOR_NAME = 
asVariable("operator_name");
+
+       /** The default scope added by the operator component: 
"<operator_name>.<subtask_index>" */
+       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
+                       concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
+
+       /** The default scope format for all operator metrics:
+        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
+       public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
+                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_OPERATOR_COMPONENT);
+       
+
+       // 
------------------------------------------------------------------------
+       //  Scope Format Base
+       // 
------------------------------------------------------------------------
+
+       /** The scope format */
+       private final String format;
+
+       /** The format, split into components */
+       private final String[] template;
+
+       private final int[] templatePos;
+
+       private final int[] valuePos;
+
+       // 
------------------------------------------------------------------------
+
+       protected ScopeFormat(String format, ScopeFormat parent, String[] 
variables) {
+               checkNotNull(format, "format is null");
+
+               final String[] rawComponents = format.split("\\" + 
SCOPE_SEPARATOR);
+
+               // compute the template array
+               final boolean parentAsPrefix = rawComponents.length > 0 && 
rawComponents[0].equals(SCOPE_INHERIT_PARENT);
+               if (parentAsPrefix) {
+                       if (parent == null) {
+                               throw new IllegalArgumentException("Component 
scope format requires parent prefix (starts with '"
+                                       + SCOPE_INHERIT_PARENT + "'), but this 
component has no parent (is root component).");
+                       }
+
+                       this.format = format.length() > 2 ? format.substring(2) 
: "<empty>";
+
+                       String[] parentTemplate = parent.template;
+                       int parentLen = parentTemplate.length;
+                       
+                       this.template = new String[parentLen + 
rawComponents.length - 1];
+                       System.arraycopy(parentTemplate, 0, this.template, 0, 
parentLen);
+                       System.arraycopy(rawComponents, 1, this.template, 
parentLen, rawComponents.length - 1);
+               }
+               else {
+                       this.format = format.isEmpty() ? "<empty>" : format;
+                       this.template = rawComponents;
+               }
+
+               // --- compute the replacement matrix ---
+               // a bit of clumsy Java collections code ;-)
+               
+               HashMap<String, Integer> varToValuePos = arrayToMap(variables);
+               List<Integer> templatePos = new ArrayList<>();
+               List<Integer> valuePos = new ArrayList<>();
+
+               for (int i = 0; i < template.length; i++) {
+                       final String component = template[i];
+                       
+                       // check if that is a variable
+                       if (component != null && component.length() >= 3 &&
+                                       component.charAt(0) == '<' && 
component.charAt(component.length() - 1) == '>') {
+
+                               // this is a variable
+                               Integer replacementPos = 
varToValuePos.get(component);
+                               if (replacementPos != null) {
+                                       templatePos.add(i);
+                                       valuePos.add(replacementPos);
+                               }
+                       }
+               }
+
+               this.templatePos = integerListToArray(templatePos);
+               this.valuePos = integerListToArray(valuePos);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public String format() {
+               return format;
+       }
+
+       protected final String[] copyTemplate() {
+               String[] copy = new String[template.length];
+               System.arraycopy(template, 0, copy, 0, template.length);
+               return copy;
+       }
+
+       protected final String[] bindVariables(String[] template, String[] 
values) {
+               final int len = templatePos.length;
+               for (int i = 0; i < len; i++) {
+                       template[templatePos[i]] = values[valuePos[i]];
+               }
+               return template;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "ScopeFormat '" + format + '\'';
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Formats the given string to resemble a scope variable.
+        *
+        * @param scope The string to format
+        * @return The formatted string
+        */
+       public static String asVariable(String scope) {
+               return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX;
+       }
+
+       public static String concat(String... components) {
+               return concat(defaultFilter, '.', components);
+       }
+
+       public static String concat(CharacterFilter filter, String... 
components) {
+               return concat(filter, '.', components);
+       }
+
+       public static String concat(Character delimiter, String... components) {
+               return concat(defaultFilter, delimiter, components);
+       }
+
+       /**
+        * Concatenates the given component names separated by the delimiter 
character. Additionally
+        * the character filter is applied to all component names.
+        *
+        * @param filter Character filter to be applied to the component names
+        * @param delimiter Delimiter to separate component names
+        * @param components Array of component names
+        * @return The concatenated component name
+        */
+       public static String concat(CharacterFilter filter, Character 
delimiter, String... components) {
+               StringBuilder sb = new StringBuilder();
+               sb.append(filter.filterCharacters(components[0]));
+               for (int x = 1; x < components.length; x++) {
+                       sb.append(delimiter);
+                       sb.append(filter.filterCharacters(components[x]));
+               }
+               return sb.toString();
+       }
+       
+       protected static String valueOrNull(Object value) {
+               return (value == null || (value instanceof String && ((String) 
value).isEmpty())) ?
+                               "null" : value.toString();
+       }
+
+       protected static HashMap<String, Integer> arrayToMap(String[] array) {
+               HashMap<String, Integer> map = new HashMap<>(array.length);
+               for (int i = 0; i < array.length; i++) {
+                       map.put(array[i], i);
+               }
+               return map;
+       }
+
+       private static int[] integerListToArray(List<Integer> list) {
+               int[] array = new int[list.size()];
+               int pos = 0;
+               for (Integer i : list) {
+                       array[pos++] = i;
+               }
+               return array;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
new file mode 100644
index 0000000..bbbe6ba
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A container for component scope formats.
+ */
+public class ScopeFormats {
+
+       private final JobManagerScopeFormat jobManagerFormat;
+       private final JobManagerJobScopeFormat jobManagerJobFormat;
+       private final TaskManagerScopeFormat taskManagerFormat;
+       private final TaskManagerJobScopeFormat taskManagerJobFormat;
+       private final TaskScopeFormat taskFormat;
+       private final OperatorScopeFormat operatorFormat;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates all default scope formats.
+        */
+       public ScopeFormats() {
+               this.jobManagerFormat = new 
JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+
+               this.jobManagerJobFormat = new JobManagerJobScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, 
this.jobManagerFormat);
+
+               this.taskManagerFormat = new 
TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+
+               this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
+                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+
+               this.taskFormat = new TaskScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, 
this.taskManagerJobFormat);
+
+               this.operatorFormat = new OperatorScopeFormat(
+                               ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, 
this.taskFormat);
+       }
+
+       /**
+        * Creates all scope formats, based on the given scope format strings.
+        */
+       public ScopeFormats(
+                       String jobManagerFormat,
+                       String jobManagerJobFormat,
+                       String taskManagerFormat,
+                       String taskManagerJobFormat,
+                       String taskFormat,
+                       String operatorFormat)
+       {
+               this.jobManagerFormat = new 
JobManagerScopeFormat(jobManagerFormat);
+               this.jobManagerJobFormat = new 
JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
+               this.taskManagerFormat = new 
TaskManagerScopeFormat(taskManagerFormat);
+               this.taskManagerJobFormat = new 
TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat);
+               this.taskFormat = new TaskScopeFormat(taskFormat, 
this.taskManagerJobFormat);
+               this.operatorFormat = new OperatorScopeFormat(operatorFormat, 
this.taskFormat);
+       }
+
+       /**
+        * Creates a {@code ScopeFormats} with the given scope formats.
+        */
+       public ScopeFormats(
+                       JobManagerScopeFormat jobManagerFormat,
+                       JobManagerJobScopeFormat jobManagerJobFormat,
+                       TaskManagerScopeFormat taskManagerFormat,
+                       TaskManagerJobScopeFormat taskManagerJobFormat,
+                       TaskScopeFormat taskFormat,
+                       OperatorScopeFormat operatorFormat)
+       {
+               this.jobManagerFormat = checkNotNull(jobManagerFormat);
+               this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
+               this.taskManagerFormat = checkNotNull(taskManagerFormat);
+               this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat);
+               this.taskFormat = checkNotNull(taskFormat);
+               this.operatorFormat = checkNotNull(operatorFormat);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Accessors
+       // 
------------------------------------------------------------------------
+
+       public JobManagerScopeFormat getJobManagerFormat() {
+               return this.jobManagerFormat;
+       }
+
+       public TaskManagerScopeFormat getTaskManagerFormat() {
+               return this.taskManagerFormat;
+       }
+
+       public TaskManagerJobScopeFormat getTaskManagerJobFormat() {
+               return this.taskManagerJobFormat;
+       }
+
+       public JobManagerJobScopeFormat getJobManagerJobFormat() {
+               return this.jobManagerJobFormat;
+       }
+
+       public TaskScopeFormat getTaskFormat() {
+               return this.taskFormat;
+       }
+
+       public OperatorScopeFormat getOperatorFormat() {
+               return this.operatorFormat;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Parsing from Config
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates the scope formats as defined in the given configuration
+        * 
+        * @param config The configuration that defines the formats
+        * @return The ScopeFormats parsed from the configuration
+        */
+       public static ScopeFormats fromConfig(Configuration config) {
+               String jmFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
+               String jmJobFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
+               String tmFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+               String tmJobFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+               String taskFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
+               String operatorFormat = config.getString(
+                               ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+
+               return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java
new file mode 100644
index 0000000..558e710
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.JobMetricGroup}.
+ */
+public class TaskManagerJobScopeFormat extends ScopeFormat {
+
+       public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat 
parentFormat) {
+               super(format, parentFormat, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_TASKMANAGER_ID,
+                               SCOPE_JOB_ID,
+                               SCOPE_JOB_NAME
+               });
+       }
+
+       public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, 
String jobName) {
+               final String[] template = copyTemplate();
+               final String[] values = {
+                               parent.hostname(),
+                               parent.taskManagerId(),
+                               valueOrNull(jid),
+                               valueOrNull(jobName)
+               };
+               return bindVariables(template, values);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java
new file mode 100644
index 0000000..6e7c39d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup}.
+ */
+public class TaskManagerScopeFormat extends ScopeFormat {
+
+       public TaskManagerScopeFormat(String format) {
+               super(format, null, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_TASKMANAGER_ID
+               });
+       }
+
+       public String[] formatScope(String hostname, String taskManagerId) {
+               final String[] template = copyTemplate();
+               final String[] values = { hostname, taskManagerId };
+               return bindVariables(template, values);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java
new file mode 100644
index 0000000..a781bf1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.scope;
+
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * The scope format for the {@link 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup}.
+ */
+public class TaskScopeFormat extends ScopeFormat {
+
+       public TaskScopeFormat(String format, TaskManagerJobScopeFormat 
parentFormat) {
+               super(format, parentFormat, new String[] {
+                               SCOPE_ACTOR_HOST,
+                               SCOPE_TASKMANAGER_ID,
+                               SCOPE_JOB_ID,
+                               SCOPE_JOB_NAME,
+                               SCOPE_TASK_VERTEX_ID,
+                               SCOPE_TASK_ATTEMPT_ID,
+                               SCOPE_TASK_NAME,
+                               SCOPE_TASK_SUBTASK_INDEX,
+                               SCOPE_TASK_ATTEMPT_NUM
+               });
+       }
+
+       public String[] formatScope(
+                       TaskManagerJobMetricGroup parent,
+                       AbstractID vertexId, AbstractID attemptId,
+                       String taskName, int subtask, int attemptNumber) {
+
+               final String[] template = copyTemplate();
+               final String[] values = {
+                               parent.parent().hostname(),
+                               parent.parent().taskManagerId(),
+                               valueOrNull(parent.jobId()),
+                               valueOrNull(parent.jobName()),
+                               valueOrNull(vertexId),
+                               valueOrNull(attemptId),
+                               valueOrNull(taskName),
+                               String.valueOf(subtask),
+                               String.valueOf(attemptNumber)
+               };
+               return bindVariables(template, values);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 3b72730..6fdf6f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 25e4b43..dbc0b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ef52381..0026bef 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -35,8 +35,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => 
FlinkMetricRegistry}
-import org.apache.flink.metrics.groups.{JobManagerMetricGroup, 
UnregisteredMetricsGroup}
+import org.apache.flink.metrics.{Gauge, MetricGroup}
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
@@ -71,6 +71,8 @@ import 
org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, Abstract
 
 import org.apache.flink.runtime.messages.webmonitor.InfoMessage
 import org.apache.flink.runtime.messages.webmonitor._
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -1114,7 +1116,7 @@ class JobManager(
 
         val jobMetrics = jobManagerMetricGroup match {
           case Some(group) =>
-            group.addJob(jobGraph.getJobID, jobGraph.getName) match {
+            group.addJob(jobGraph) match {
               case (jobGroup:Any) => jobGroup
               case null => new UnregisteredMetricsGroup()
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0ff929c..dcf1e38 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -38,8 +38,7 @@ import grizzled.slf4j.Logger
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, 
MemorySegmentFactory, MemoryType}
-import org.apache.flink.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge, 
MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -65,6 +64,8 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -1092,11 +1093,7 @@ class TaskManager(
         jobName = tdd.getJobName
       }
       
-      val taskMetricGroup = taskManagerMetricGroup
-          .addTaskForJob(
-            tdd.getJobID, jobName,
-            tdd.getVertexID, tdd.getExecutionId, tdd.getTaskName,
-            tdd.getIndexInSubtaskGroup, tdd.getAttemptNumber)
+      val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)
 
       val task = new Task(
         tdd,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index bebcf7b..16331ac 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.testingUtils
 import akka.actor.ActorRef
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.metrics.MetricRegistry
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.metrics.MetricRegistry
 
 import scala.concurrent.duration._
 import scala.language.postfixOps

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 219e440..630335b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -23,10 +23,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -44,6 +42,8 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -274,18 +274,18 @@ public class ExecutionGraphMetricsTest extends TestLogger 
{
                private final Map<String, Metric> metrics = new HashMap<>();
 
                @Override
-               public void open(Configuration config) {}
+               public void open(MetricConfig config) {}
 
                @Override
                public void close() {}
 
                @Override
-               public void notifyOfAddedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
                        metrics.put(metricName, metric);
                }
 
                @Override
-               public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetricGroup group) {
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
                        metrics.remove(metricName);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
deleted file mode 100644
index 1e6f019..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-
-public class JobManagerMetricTest {
-       /**
-        * Tests that metrics registered on the JobManager are actually 
accessible.
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testJobManagerMetricAccess() throws Exception {
-               Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
-               Configuration flinkConfiguration = new Configuration();
-
-               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
"org.apache.flink.metrics.reporter.JMXReporter");
-               
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
-               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--port 9060-9075");
-
-               TestingCluster flink = new TestingCluster(flinkConfiguration);
-
-               try {
-                       flink.start();
-
-                       JobVertex sourceJobVertex = new JobVertex("Source");
-                       
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
-
-                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
-                       jobGraph.setSnapshotSettings(new 
JobSnapshottingSettings(
-                               Collections.<JobVertexID>emptyList(),
-                               Collections.<JobVertexID>emptyList(),
-                               Collections.<JobVertexID>emptyList(),
-                               500, 500, 50, 5));
-
-                       flink.waitForActorsToBeAlive();
-
-                       flink.submitJobDetached(jobGraph);
-
-                       Future<Object> jobRunning = 
flink.getLeaderGateway(deadline.timeLeft())
-                               .ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), 
deadline.timeLeft());
-                       Await.ready(jobRunning, deadline.timeLeft());
-
-                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
-                       ObjectName objectName1 = new 
ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
-                       assertEquals(-1L, mBeanServer.getAttribute(objectName1, 
"Value"));
-
-                       Future<Object> jobFinished = 
flink.getLeaderGateway(deadline.timeLeft())
-                               .ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), 
deadline.timeLeft());
-
-                       BlockingInvokable.unblock();
-
-                       // wait til the job has finished
-                       Await.ready(jobFinished, deadline.timeLeft());
-               } finally {
-                       flink.stop();
-               }
-       }
-
-       public static class BlockingInvokable extends AbstractInvokable {
-               private static boolean blocking = true;
-               private static final Object lock = new Object();
-
-               @Override
-               public void invoke() throws Exception {
-                       while (blocking) {
-                               synchronized (lock) {
-                                       lock.wait();
-                               }
-                       }
-               }
-
-               public static void unblock() {
-                       blocking = false;
-
-                       synchronized (lock) {
-                               lock.notifyAll();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
new file mode 100644
index 0000000..3252e3d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetricRegistryTest extends TestLogger {
+       
+       /**
+        * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
+        */
+       @Test
+       public void testReporterInstantiation() {
+               Configuration config = new Configuration();
+
+               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
+
+               new MetricRegistry(config);
+
+               Assert.assertTrue(TestReporter1.wasOpened);
+       }
+
+       protected static class TestReporter1 extends TestReporter {
+               public static boolean wasOpened = false;
+
+               @Override
+               public void open(MetricConfig config) {
+                       wasOpened = true;
+               }
+       }
+
+       /**
+        * Verifies that configured arguments are properly forwarded to the 
reporter.
+        */
+       @Test
+       public void testReporterArgumentForwarding() {
+               Configuration config = new Configuration();
+
+               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter2.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, 
"--arg1 hello --arg2 world");
+
+               new MetricRegistry(config);
+       }
+
+       protected static class TestReporter2 extends TestReporter {
+               @Override
+               public void open(MetricConfig config) {
+                       Assert.assertEquals("hello", config.getString("arg1", 
null));
+                       Assert.assertEquals("world", config.getString("arg2", 
null));
+               }
+       }
+
+       /**
+        * Verifies that reporters implementing the Scheduled interface are 
regularly called to report the metrics.
+        *
+        * @throws InterruptedException
+        */
+       @Test
+       public void testReporterScheduling() throws InterruptedException {
+               Configuration config = new Configuration();
+
+               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter3.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 
MILLISECONDS");
+
+               new MetricRegistry(config);
+
+               long start = System.currentTimeMillis();
+               for (int x = 0; x < 10; x++) {
+                       Thread.sleep(100);
+                       int reportCount = TestReporter3.reportCount;
+                       long curT = System.currentTimeMillis();
+                       /**
+                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports. 
+                        * This value however does not not take the first 
triggered report into account (=> +1). 
+                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time 
+                        * measurement (=> +1); for T=200 a total of 4-6 
reports may have been
+                        * triggered depending on whether the end of the 
interval for the first reports ends before
+                        * or after T=50.
+                        */
+                       long maxAllowedReports = (curT - start) / 50 + 2;
+                       Assert.assertTrue("Too many report were triggered.", 
maxAllowedReports >= reportCount);
+               }
+               Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
+       }
+
+       protected static class TestReporter3 extends TestReporter implements 
Scheduled {
+               public static int reportCount = 0;
+
+               @Override
+               public void report() {
+                       reportCount++;
+               }
+       }
+
+       /**
+        * Verifies that reporters implementing the Listener interface are 
notified when Metrics are added or removed.
+        */
+       @Test
+       public void testListener() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter6.class.getName());
+
+               MetricRegistry registry = new MetricRegistry(config);
+
+               TaskManagerMetricGroup root = new 
TaskManagerMetricGroup(registry, "host", "id");
+               root.counter("rootCounter");
+               root.close();
+
+               assertTrue(TestReporter6.addCalled);
+               assertTrue(TestReporter6.removeCalled);
+       }
+
+       protected static class TestReporter6 extends TestReporter {
+               public static boolean addCalled = false;
+               public static boolean removeCalled = false;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       addCalled = true;
+                       assertTrue(metric instanceof Counter);
+                       assertEquals("rootCounter", metricName);
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       removeCalled = true;
+                       Assert.assertTrue(metric instanceof Counter);
+                       Assert.assertEquals("rootCounter", metricName);
+               }
+       }
+
+       /**
+        * Verifies that the scope configuration is properly extracted.
+        */
+       @Test
+       public void testScopeConfig() {
+               Configuration config = new Configuration();
+
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"B");
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"C");
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
"D");
+
+               ScopeFormats scopeConfig = 
MetricRegistry.createScopeConfig(config);
+
+               assertEquals("A", scopeConfig.getTaskManagerFormat().format());
+               assertEquals("B", 
scopeConfig.getTaskManagerJobFormat().format());
+               assertEquals("C", scopeConfig.getTaskFormat().format());
+               assertEquals("D", scopeConfig.getOperatorFormat().format());
+       }
+
+       @Test
+       public void testConfigurableDelimiter() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D.E");
+
+               MetricRegistry registry = new MetricRegistry(config);
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
+               assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
new file mode 100644
index 0000000..ddb5dfc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JobManagerGroupTest {
+
+       // 
------------------------------------------------------------------------
+       //  adding and removing jobs
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void addAndRemoveJobs() {
+               final JobManagerMetricGroup group = new JobManagerMetricGroup(
+                       new MetricRegistry(new Configuration()), "localhost");
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new 
JobGraph(jid1, jobName1));
+               JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new 
JobGraph(jid1, jobName1));
+               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new 
JobGraph(jid2, jobName2));
+
+               assertEquals(jmJobGroup11, jmJobGroup12);
+
+               assertEquals(2, group.numRegisteredJobMetricGroups());
+
+               group.removeJob(jid1);
+
+               assertTrue(jmJobGroup11.isClosed());
+               assertEquals(1, group.numRegisteredJobMetricGroups());
+
+               group.removeJob(jid2);
+
+               assertTrue(jmJobGroup21.isClosed());
+               assertEquals(0, group.numRegisteredJobMetricGroups());
+       }
+
+       @Test
+       public void testCloseClosesAll() {
+               final JobManagerMetricGroup group = new JobManagerMetricGroup(
+                       new MetricRegistry(new Configuration()), "localhost");
+
+               final JobID jid1 = new JobID();
+               final JobID jid2 = new JobID();
+
+               final String jobName1 = "testjob";
+               final String jobName2 = "anotherJob";
+
+               JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new 
JobGraph(jid1, jobName1));
+               JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new 
JobGraph(jid2, jobName2));
+
+               group.close();
+
+               assertTrue(jmJobGroup11.isClosed());
+               assertTrue(jmJobGroup21.isClosed());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  scope name tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "localhost");
+
+               assertArrayEquals(new String[]{"localhost", "jobmanager"}, 
group.getScopeComponents());
+               assertEquals("localhost.jobmanager.name", 
group.getMetricIdentifier("name"));
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerScopeFormat format = new 
JobManagerScopeFormat("constant.<host>.foo.<host>");
+               JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, format, "host");
+
+               assertArrayEquals(new String[]{"constant", "host", "foo", 
"host"}, group.getScopeComponents());
+               assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
new file mode 100644
index 0000000..a13916b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat;
+import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerJobGroupTest {
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, new JobID(), "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "jobmanager", 
"myJobName"},
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "theHostName.jobmanager.myJobName.name",
+                               jmGroup.getMetricIdentifier("name"));
+       }
+
+       @Test
+       public void testGenerateScopeCustom() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("abc");
+               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat);
+
+               JobID jid = new JobID();
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "some-constant", "myJobName" },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "some-constant.myJobName.name",
+                               jmGroup.getMetricIdentifier("name"));
+       }
+
+       @Test
+       public void testGenerateScopeCustomWildcard() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               JobManagerScopeFormat tmFormat = new 
JobManagerScopeFormat("peter");
+               JobManagerJobScopeFormat jmFormat = new 
JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat);
+
+               JobID jid = new JobID();
+
+               JobManagerMetricGroup tmGroup = new 
JobManagerMetricGroup(registry, tmFormat, "theHostName");
+               JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, 
tmGroup, jmFormat, jid, "myJobName");
+
+               assertArrayEquals(
+                               new String[] { "peter", "some-constant", 
jid.toString() },
+                               jmGroup.getScopeComponents());
+
+               assertEquals(
+                               "peter.some-constant." + jid + ".name",
+                               jmGroup.getMetricIdentifier("name"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
new file mode 100644
index 0000000..39485dc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetricGroupRegistrationTest {
+       /**
+        * Verifies that group methods instantiate the correct metric with the 
given name.
+        */
+       @Test
+       public void testMetricInstantiation() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTER_CLASS, 
TestReporter1.class.getName());
+
+               MetricRegistry registry = new MetricRegistry(config);
+
+               MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
+
+               Counter counter = root.counter("counter");
+               assertEquals(counter, TestReporter1.lastPassedMetric);
+               assertEquals("counter", TestReporter1.lastPassedName);
+
+               Gauge<Object> gauge = root.gauge("gauge", new Gauge<Object>() {
+                       @Override
+                       public Object getValue() {
+                               return null;
+                       }
+               });
+               
+               Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
+               assertEquals("gauge", TestReporter1.lastPassedName);
+
+               Histogram histogram = root.histogram("histogram", new 
Histogram() {
+                       @Override
+                       public void update(long value) {
+
+                       }
+
+                       @Override
+                       public long getCount() {
+                               return 0;
+                       }
+
+                       @Override
+                       public HistogramStatistics getStatistics() {
+                               return null;
+                       }
+               });
+
+               Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
+               assertEquals("histogram", TestReporter1.lastPassedName);
+               registry.shutdown();
+       }
+
+       public static class TestReporter1 extends TestReporter {
+               
+               public static Metric lastPassedMetric;
+               public static String lastPassedName;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       lastPassedMetric = metric;
+                       lastPassedName = metricName;
+               }
+       }
+
+       /**
+        * Verifies that when attempting to create a group with the name of an 
existing one the existing one will be returned instead.
+        */
+       @Test
+       public void testDuplicateGroupName() {
+               Configuration config = new Configuration();
+
+               MetricRegistry registry = new MetricRegistry(config);
+
+               MetricGroup root = new TaskManagerMetricGroup(registry, "host", 
"id");
+
+               MetricGroup group1 = root.addGroup("group");
+               MetricGroup group2 = root.addGroup("group");
+               MetricGroup group3 = root.addGroup("group");
+               Assert.assertTrue(group1 == group2 && group2 == group3);
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
new file mode 100644
index 0000000..16fb518
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetricGroupTest {
+       
+       private MetricRegistry registry;
+
+       private final MetricRegistry exceptionOnRegister = new 
ExceptionOnRegisterRegistry();
+
+       @Before
+       public void createRegistry() {
+               this.registry = new MetricRegistry(new Configuration());
+       }
+
+       @After
+       public void shutdownRegistry() {
+               this.registry.shutdown();
+               this.registry = null;
+       }
+
+       @Test
+       public void sameGroupOnNameCollision() {
+               GenericMetricGroup group = new GenericMetricGroup(
+               registry, new DummyAbstractMetricGroup(registry), "somegroup");
+
+               String groupName = "sometestname";
+               MetricGroup subgroup1 = group.addGroup(groupName);
+               MetricGroup subgroup2 = group.addGroup(groupName);
+               
+               assertNotNull(subgroup1);
+               assertNotNull(subgroup2);
+               assertTrue(subgroup1 == subgroup2);
+       }
+
+       @Test
+       public void closedGroupDoesNotRegisterMetrics() {
+               GenericMetricGroup group = new GenericMetricGroup(
+                               exceptionOnRegister, new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
+               assertFalse(group.isClosed());
+
+               group.close();
+               assertTrue(group.isClosed());
+
+               // these will fail is the registration is propagated
+               group.counter("testcounter");
+               group.gauge("testgauge", new Gauge<Object>() {
+                       @Override
+                       public Object getValue() { return null; }
+               });
+       }
+       
+       @Test
+       public void closedGroupCreatesClosedGroups() {
+               GenericMetricGroup group = new 
GenericMetricGroup(exceptionOnRegister,
+                               new 
DummyAbstractMetricGroup(exceptionOnRegister), "testgroup");
+               assertFalse(group.isClosed());
+
+               group.close();
+               assertTrue(group.isClosed());
+               
+               AbstractMetricGroup subgroup = (AbstractMetricGroup) 
group.addGroup("test subgroup");
+               assertTrue(subgroup.isClosed());
+       }
+
+       @Test
+       public void tolerateMetricNameCollisions() {
+               final String name = "abctestname";
+               GenericMetricGroup group = new GenericMetricGroup(
+                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
+               
+               assertNotNull(group.counter(name));
+               assertNotNull(group.counter(name));
+       }
+
+       @Test
+       public void tolerateMetricAndGroupNameCollisions() {
+               final String name = "abctestname";
+               GenericMetricGroup group = new GenericMetricGroup(
+                               registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
+               
+               assertNotNull(group.addGroup(name));
+               assertNotNull(group.counter(name));
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       private static class ExceptionOnRegisterRegistry extends MetricRegistry 
{
+               
+               public ExceptionOnRegisterRegistry() {
+                       super(new Configuration());
+               }
+
+               @Override
+               public void register(Metric metric, String name, MetricGroup 
parent) {
+                       fail("Metric should never be registered");
+               }
+
+               @Override
+               public void unregister(Metric metric, String name, MetricGroup 
parent) {
+                       fail("Metric should never be un-registered");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static class DummyAbstractMetricGroup extends 
AbstractMetricGroup {
+
+               public DummyAbstractMetricGroup(MetricRegistry registry) {
+                       super(registry, new String[0]);
+               }
+
+               @Override
+               protected void addMetric(String name, Metric metric) {}
+
+               @Override
+               public MetricGroup addGroup(String name) {
+                       return new DummyAbstractMetricGroup(registry);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
new file mode 100644
index 0000000..4da713f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class OperatorGroupTest {
+
+       @Test
+       public void testGenerateScopeDefault() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+
+               TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
+               TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
+               TaskMetricGroup taskGroup = new TaskMetricGroup(
+                               registry, jmGroup,  new AbstractID(),  new 
AbstractID(), "aTaskName", 11, 0);
+               OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, 
taskGroup, "myOpName");
+
+               assertArrayEquals(
+                               new String[] { "theHostName", "taskmanager", 
"test-tm-id", "myJobName", "myOpName", "11" },
+                               opGroup.getScopeComponents());
+
+               assertEquals(
+                               
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name",
+                               opGroup.getMetricIdentifier("name"));
+
+               registry.shutdown();
+       }
+}

Reply via email to