[FLINK-4192] [metrics] Move metrics classes out of 'flink-core'

- moved user-facing API to 'flink-metrics/flink-metrics-core'
- moved JMXReporter to 'flink-metrics/flink-metrics-jmx'
- moved remaining metric classes to 'flink-runtime'

This closes #2226


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fec1f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fec1f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fec1f9

Branch: refs/heads/master
Commit: e3fec1f9ad737c01fc5c22bd0c327bfce91938a9
Parents: e4fe89d
Author: zentol <[email protected]>
Authored: Fri Jul 22 12:33:16 2016 +0200
Committer: zentol <[email protected]>
Committed: Tue Jul 26 12:30:15 2016 +0200

----------------------------------------------------------------------
 docs/apis/metrics.md                            |   4 +-
 flink-core/pom.xml                              |   6 +
 .../apache/flink/metrics/CharacterFilter.java   |  36 --
 .../java/org/apache/flink/metrics/Counter.java  |  59 ---
 .../java/org/apache/flink/metrics/Gauge.java    |  34 --
 .../org/apache/flink/metrics/Histogram.java     |  52 --
 .../flink/metrics/HistogramStatistics.java      |  81 ---
 .../java/org/apache/flink/metrics/Metric.java   |  28 --
 .../org/apache/flink/metrics/MetricGroup.java   | 133 -----
 .../apache/flink/metrics/MetricRegistry.java    | 273 -----------
 .../org/apache/flink/metrics/SimpleCounter.java |  71 ---
 .../metrics/groups/AbstractMetricGroup.java     | 292 -----------
 .../metrics/groups/ComponentMetricGroup.java    |  78 ---
 .../metrics/groups/GenericMetricGroup.java      |  49 --
 .../flink/metrics/groups/IOMetricGroup.java     |  52 --
 .../groups/JobManagerJobMetricGroup.java        |  69 ---
 .../metrics/groups/JobManagerMetricGroup.java   | 104 ----
 .../flink/metrics/groups/JobMetricGroup.java    |  62 ---
 .../metrics/groups/OperatorMetricGroup.java     |  62 ---
 .../flink/metrics/groups/ProxyMetricGroup.java  |  90 ----
 .../groups/TaskManagerJobMetricGroup.java       | 122 -----
 .../metrics/groups/TaskManagerMetricGroup.java  | 134 -----
 .../flink/metrics/groups/TaskMetricGroup.java   | 169 -------
 .../groups/UnregisteredMetricsGroup.java        |  84 ----
 .../flink/metrics/groups/scope/ScopeFormat.java | 490 ------------------
 .../metrics/groups/scope/ScopeFormats.java      | 130 -----
 .../metrics/reporter/AbstractReporter.java      |  78 ---
 .../flink/metrics/reporter/JMXReporter.java     | 491 -------------------
 .../flink/metrics/reporter/MetricReporter.java  |  75 ---
 .../flink/metrics/reporter/Scheduled.java       |  34 --
 .../flink/metrics/MetricRegistryTest.java       | 194 --------
 .../metrics/groups/JobManagerGroupTest.java     | 108 ----
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ----
 .../groups/MetricGroupRegistrationTest.java     | 114 -----
 .../flink/metrics/groups/MetricGroupTest.java   | 149 ------
 .../flink/metrics/groups/OperatorGroupTest.java |  53 --
 .../metrics/groups/TaskManagerGroupTest.java    | 154 ------
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ----
 .../metrics/groups/TaskMetricGroupTest.java     | 167 -------
 .../flink/metrics/reporter/JMXReporterTest.java | 291 -----------
 .../apache/flink/metrics/util/TestReporter.java |  44 --
 flink-dist/pom.xml                              |   7 +
 flink-metrics/flink-metrics-core/pom.xml        |  35 ++
 .../apache/flink/metrics/CharacterFilter.java   |  36 ++
 .../java/org/apache/flink/metrics/Counter.java  |  56 +++
 .../java/org/apache/flink/metrics/Gauge.java    |  32 ++
 .../org/apache/flink/metrics/Histogram.java     |  49 ++
 .../flink/metrics/HistogramStatistics.java      |  78 +++
 .../java/org/apache/flink/metrics/Metric.java   |  25 +
 .../org/apache/flink/metrics/MetricConfig.java  |  62 +++
 .../org/apache/flink/metrics/MetricGroup.java   | 162 ++++++
 .../org/apache/flink/metrics/SimpleCounter.java |  73 +++
 .../groups/UnregisteredMetricsGroup.java        |  98 ++++
 .../metrics/reporter/AbstractReporter.java      |  76 +++
 .../flink/metrics/reporter/MetricReporter.java  |  73 +++
 .../flink/metrics/reporter/Scheduled.java       |  31 ++
 flink-metrics/flink-metrics-dropwizard/pom.xml  |  16 +-
 .../dropwizard/ScheduledDropwizardReporter.java |  12 +-
 .../ScheduledDropwizardReporterTest.java        |  17 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   7 +-
 flink-metrics/flink-metrics-ganglia/pom.xml     |   2 +-
 .../flink/metrics/ganglia/GangliaReporter.java  |   4 +-
 flink-metrics/flink-metrics-graphite/pom.xml    |   2 +-
 .../metrics/graphite/GraphiteReporter.java      |   4 +-
 flink-metrics/flink-metrics-jmx/pom.xml         |  81 +++
 .../apache/flink/metrics/jmx/JMXReporter.java   | 491 +++++++++++++++++++
 .../flink/metrics/jmx/JMXReporterTest.java      | 289 +++++++++++
 .../jobmanager/JMXJobManagerMetricTest.java     | 119 +++++
 .../src/test/resources/log4j-test.properties    |  27 +
 .../src/test/resources/logback-test.xml         |  34 ++
 flink-metrics/flink-metrics-statsd/pom.xml      |   9 +-
 .../flink/metrics/statsd/StatsDReporter.java    |   4 +-
 .../metrics/statsd/StatsDReporterTest.java      |  11 +-
 flink-metrics/pom.xml                           |   2 +
 .../flink/runtime/execution/Environment.java    |   2 +-
 .../api/serialization/RecordSerializer.java     |   2 +-
 .../serialization/SpanningRecordSerializer.java |   2 +-
 .../io/network/api/writer/RecordWriter.java     |   2 +-
 .../partition/consumer/LocalInputChannel.java   |   2 +-
 .../partition/consumer/RemoteInputChannel.java  |   2 +-
 .../partition/consumer/SingleInputGate.java     |   2 +-
 .../partition/consumer/UnknownInputChannel.java |   2 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 269 ++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 292 +++++++++++
 .../metrics/groups/ComponentMetricGroup.java    |  78 +++
 .../metrics/groups/GenericMetricGroup.java      |  47 ++
 .../runtime/metrics/groups/IOMetricGroup.java   |  52 ++
 .../groups/JobManagerJobMetricGroup.java        |  71 +++
 .../metrics/groups/JobManagerMetricGroup.java   | 103 ++++
 .../runtime/metrics/groups/JobMetricGroup.java  |  62 +++
 .../metrics/groups/OperatorMetricGroup.java     |  64 +++
 .../metrics/groups/ProxyMetricGroup.java        | 106 ++++
 .../groups/TaskManagerJobMetricGroup.java       | 125 +++++
 .../metrics/groups/TaskManagerMetricGroup.java  | 132 +++++
 .../runtime/metrics/groups/TaskMetricGroup.java | 170 +++++++
 .../metrics/scope/JobManagerJobScopeFormat.java |  46 ++
 .../metrics/scope/JobManagerScopeFormat.java    |  37 ++
 .../metrics/scope/OperatorScopeFormat.java      |  60 +++
 .../runtime/metrics/scope/ScopeFormat.java      | 307 ++++++++++++
 .../runtime/metrics/scope/ScopeFormats.java     | 153 ++++++
 .../scope/TaskManagerJobScopeFormat.java        |  48 ++
 .../metrics/scope/TaskManagerScopeFormat.java   |  38 ++
 .../runtime/metrics/scope/TaskScopeFormat.java  |  62 +++
 .../runtime/taskmanager/RuntimeEnvironment.java |   2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  11 +-
 .../testingUtils/TestingJobManager.scala        |   2 +-
 .../ExecutionGraphMetricsTest.java              |  12 +-
 .../jobmanager/JobManagerMetricTest.java        | 118 -----
 .../runtime/metrics/MetricRegistryTest.java     | 197 ++++++++
 .../metrics/groups/JobManagerGroupTest.java     | 109 ++++
 .../metrics/groups/JobManagerJobGroupTest.java  |  90 ++++
 .../groups/MetricGroupRegistrationTest.java     | 114 +++++
 .../runtime/metrics/groups/MetricGroupTest.java | 149 ++++++
 .../metrics/groups/OperatorGroupTest.java       |  53 ++
 .../metrics/groups/TaskManagerGroupTest.java    | 271 ++++++++++
 .../metrics/groups/TaskManagerJobGroupTest.java |  94 ++++
 .../metrics/groups/TaskMetricGroupTest.java     | 168 +++++++
 .../runtime/metrics/util/TestReporter.java      |  44 ++
 .../operators/testutils/DummyEnvironment.java   |   2 +-
 .../operators/testutils/MockEnvironment.java    |   4 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  10 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   2 +-
 .../flink-connector-kafka-0.8/pom.xml           |   7 +
 .../flink-connector-kafka-0.9/pom.xml           |   7 +
 .../flink-connector-kafka-base/pom.xml          |   7 +
 .../connectors/kafka/KafkaTestBase.java         |   3 +-
 .../runtime/io/StreamInputProcessor.java        |   2 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   2 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   2 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   2 +-
 136 files changed, 5677 insertions(+), 5088 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/docs/apis/metrics.md
----------------------------------------------------------------------
diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md
index 75e7a52..e8c2772 100644
--- a/docs/apis/metrics.md
+++ b/docs/apis/metrics.md
@@ -230,7 +230,7 @@ or by assigning unique names to jobs and operators.
 Metrics can be exposed to an external system by configuring a reporter in 
`conf/flink-conf.yaml`.
 
 - `metrics.reporter.class`: The class of the reporter to use.
-  - Example: org.apache.flink.metrics.reporter.JMXReporter
+  - Example: org.apache.flink.metrics.jmx.JMXReporter
 - `metrics.reporter.arguments`: A list of named parameters that are passed to 
the reporter.
   - Example: --host localhost --port 9010
 - `metrics.reporter.interval`: The interval between reports.
@@ -241,7 +241,7 @@ If the Reporter should send out reports regularly you have 
to implement the `Sch
 
 The following sections list the supported reporters.
 
-### JMX (org.apache.flink.metrics.reporter.JMXReporter)
+### JMX (org.apache.flink.metrics.jmx.JMXReporter)
 
 You don't have to include an additional dependency since the JMX reporter is 
available by default
 but not activated.

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 21d00b3..5496f8c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -42,6 +42,12 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
                        <groupId>com.esotericsoftware.kryo</groupId>
                        <artifactId>kryo</artifactId>
                        <!-- managed version -->

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
deleted file mode 100644
index 1e9fbc4..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-/**
- * Interface for a character filter function. The filter function is given a 
string which the filter
- * can transform. The returned string is the transformation result.
- */
-public interface CharacterFilter {
-
-       /**
-        * Filter the given string and generate a resulting string from it.
-        *
-        * For example, one implementation could filter out invalid characters 
from the input string.
-        *
-        * @param input Input string
-        * @return Filtered result string
-        */
-       String filterCharacters(String input);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
deleted file mode 100644
index ffb1cc7..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A Counter is a {@link Metric} that measures a count.
- */
-@PublicEvolving
-public interface Counter extends Metric {
-
-       /**
-        * Increment the current count by 1.
-        */
-       void inc();
-
-       /**
-        * Increment the current count by the given value.
-        *
-        * @param n value to increment the current count by
-        */
-       void inc(long n);
-
-       /**
-        * Decrement the current count by 1.
-        */
-       void dec();
-
-       /**
-        * Decrement the current count by the given value.
-        *
-        * @param n value to decrement the current count by
-        */
-       void dec(long n);
-
-       /**
-        * Returns the current count.
-        *
-        * @return current count
-        */
-       long getCount();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
deleted file mode 100644
index 740645d..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
- */
-@PublicEvolving
-public interface Gauge<T> extends Metric {
-       /**
-        * Calculates and returns the measured value.
-        *
-        * @return calculated value
-        */
-       T getValue();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
deleted file mode 100644
index 3fd1253..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Histogram interface to be used with Flink's metrics system.
- *
- * The histogram allows to record values, get the current count of recorded 
values and create
- * histogram statistics for the currently seen elements.
- */
-@PublicEvolving
-public interface Histogram extends Metric {
-
-       /**
-        * Update the histogram with the given value.
-        *
-        * @param value Value to update the histogram with
-        */
-       void update(long value);
-
-       /**
-        * Get the count of seen elements.
-        *
-        * @return Count of seen elements
-        */
-       long getCount();
-
-       /**
-        * Create statistics for the currently recorded elements.
-        *
-        * @return Statistics about the currently recorded elements
-        */
-       HistogramStatistics getStatistics();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java 
b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
deleted file mode 100644
index 476580c..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Histogram statistics represent the current snapshot of elements recorded in 
the histogram.
- *
- * The histogram statistics allow to calculate values for quantiles, the mean, 
the standard
- * deviation, the minimum and the maximum.
- */
-@PublicEvolving
-public abstract class HistogramStatistics {
-
-       /**
-        * Returns the value for the given quantile based on the represented 
histogram statistics.
-        *
-        * @param quantile Quantile to calculate the value for
-        * @return Value for the given quantile
-        */
-       public abstract double getQuantile(double quantile);
-
-       /**
-        * Returns the elements of the statistics' sample
-        *
-        * @return Elements of the statistics' sample
-        */
-       public abstract long[] getValues();
-
-       /**
-        * Returns the size of the statistics' sample
-        *
-        * @return Size of the statistics' sample
-        */
-       public abstract int size();
-
-       /**
-        * Returns the mean of the histogram values.
-        *
-        * @return Mean of the histogram values
-        */
-       public abstract double getMean();
-
-       /**
-        * Returns the standard deviation of the distribution reflected by the 
histogram statistics.
-        *
-        * @return Standard deviation of histogram distribution
-        */
-       public abstract double getStdDev();
-
-       /**
-        * Returns the maximum value of the histogram.
-        *
-        * @return Maximum value of the histogram
-        */
-       public abstract long getMax();
-
-       /**
-        * Returns the minimum value of the histogram.
-        *
-        * @return Minimum value of the histogram
-        */
-       public abstract long getMin();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
deleted file mode 100644
index 8054de0..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * Common super interface for all metrics.
- */
-@PublicEvolving
-public interface Metric {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
deleted file mode 100644
index b578cb3..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A MetricGroup is a named container for {@link Metric Metrics} and further 
metric subgroups.
- * 
- * <p>Instances of this class can be used to register new metrics with Flink 
and to create a nested
- * hierarchy based on the group names.
- * 
- * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and 
name.
- */
-@PublicEvolving
-public interface MetricGroup {
-
-       // 
------------------------------------------------------------------------
-       //  Metrics
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
-        *
-        * @param name name of the counter
-        * @return the created counter
-        */
-       Counter counter(int name);
-
-       /**
-        * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
-        *
-        * @param name name of the counter
-        * @return the created counter
-        */
-       Counter counter(String name);
-
-       /**
-        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
-        *
-        * @param name    name of the counter
-        * @param counter counter to register
-        * @param <C>     counter type
-        * @return the given counter
-        */
-       <C extends Counter> C counter(int name, C counter);
-
-       /**
-        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
-        *
-        * @param name    name of the counter
-        * @param counter counter to register
-        * @param <C>     counter type
-        * @return the given counter
-        */
-       <C extends Counter> C counter(String name, C counter);
-       
-       /**
-        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
-        *
-        * @param name  name of the gauge
-        * @param gauge gauge to register
-        * @param <T>   return type of the gauge
-        * @return the given gauge
-        */
-       <T, G extends Gauge<T>> G gauge(int name, G gauge);
-
-       /**
-        * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
-        *
-        * @param name  name of the gauge
-        * @param gauge gauge to register
-        * @param <T>   return type of the gauge
-        * @return the given gauge
-        */
-       <T, G extends Gauge<T>> G gauge(String name, G gauge);
-
-       /**
-        * Registers a new {@link Histogram} with Flink.
-        *
-        * @param name name of the histogram
-        * @param histogram histogram to register
-        * @param <H> histogram type   
-        * @return the registered histogram
-        */
-       <H extends Histogram> H histogram(String name, H histogram);
-
-       /**
-        * Registers a new {@link Histogram} with Flink.
-        *
-        * @param name name of the histogram
-        * @param histogram histogram to register
-        * @param <H> histogram type   
-        * @return the registered histogram
-        */
-       <H extends Histogram> H histogram(int name, H histogram);
-
-       // 
------------------------------------------------------------------------
-       // Groups
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new MetricGroup and adds it to this groups sub-groups.
-        *
-        * @param name name of the group
-        * @return the created group
-        */
-       MetricGroup addGroup(int name);
-
-       /**
-        * Creates a new MetricGroup and adds it to this groups sub-groups.
-        *
-        * @param name name of the group
-        * @return the created group
-        */
-       MetricGroup addGroup(String name);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
deleted file mode 100644
index 274821e..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.metrics.groups.scope.ScopeFormat;
-import org.apache.flink.metrics.groups.scope.ScopeFormats;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.TimerTask;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
- * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
- */
-@Internal
-public class MetricRegistry {
-       static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-       
-       private final MetricReporter reporter;
-       private final ScheduledExecutorService executor;
-
-       private final ScopeFormats scopeFormats;
-
-       private final char delimiter;
-
-       /**
-        * Creates a new MetricRegistry and starts the configured reporter.
-        */
-       public MetricRegistry(Configuration config) {
-               // first parse the scope formats, these are needed for all 
reporters
-               ScopeFormats scopeFormats;
-               try {
-                       scopeFormats = createScopeConfig(config);
-               }
-               catch (Exception e) {
-                       LOG.warn("Failed to parse scope format, using default 
scope formats", e);
-                       scopeFormats = new ScopeFormats();
-               }
-               this.scopeFormats = scopeFormats;
-
-               char delim;
-               try {
-                       delim = 
config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
-               } catch (Exception e) {
-                       LOG.warn("Failed to parse delimiter, using default 
delimiter.", e);
-                       delim = '.';
-               }
-               this.delimiter = delim;
-
-               // second, instantiate any custom configured reporters
-               
-               final String className = 
config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null);
-               if (className == null) {
-                       // by default, don't report anything
-                       LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
-                       this.reporter = null;
-                       this.executor = null;
-               }
-               else {
-                       MetricReporter reporter;
-                       ScheduledExecutorService executor = null;
-                       try {
-                               String configuredPeriod = 
config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null);
-                               TimeUnit timeunit = TimeUnit.SECONDS;
-                               long period = 10;
-                               
-                               if (configuredPeriod != null) {
-                                       try {
-                                               String[] interval = 
configuredPeriod.split(" ");
-                                               period = 
Long.parseLong(interval[0]);
-                                               timeunit = 
TimeUnit.valueOf(interval[1]);
-                                       }
-                                       catch (Exception e) {
-                                               LOG.error("Cannot parse report 
interval from config: " + configuredPeriod +
-                                                               " - please use 
values like '10 SECONDS' or '500 MILLISECONDS'. " +
-                                                               "Using default 
reporting interval.");
-                                       }
-                               }
-                               
-                               Configuration reporterConfig = 
createReporterConfig(config, timeunit, period);
-                       
-                               Class<?> reporterClass = 
Class.forName(className);
-                               reporter = (MetricReporter) 
reporterClass.newInstance();
-                               reporter.open(reporterConfig);
-
-                               if (reporter instanceof Scheduled) {
-                                       executor = 
Executors.newSingleThreadScheduledExecutor();
-                                       LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-                                       
-                                       executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
-                               }
-                       }
-                       catch (Throwable t) {
-                               shutdownExecutor();
-                               LOG.error("Could not instantiate metrics 
reporter. No metrics will be exposed/reported.", t);
-                               reporter = null;
-                       }
-
-                       this.reporter = reporter;
-                       this.executor = executor;
-               }
-       }
-
-       public char getDelimiter() {
-               return this.delimiter;
-       }
-
-       public MetricReporter getReporter() {
-               return reporter;
-       }
-
-       /**
-        * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
-        */
-       public void shutdown() {
-               if (reporter != null) {
-                       try {
-                               reporter.close();
-                       } catch (Throwable t) {
-                               LOG.warn("Metrics reporter did not shut down 
cleanly", t);
-                       }
-               }
-               shutdownExecutor();
-       }
-       
-       private void shutdownExecutor() {
-               if (executor != null) {
-                       executor.shutdown();
-
-                       try {
-                               if (!executor.awaitTermination(1, 
TimeUnit.SECONDS)) {
-                                       executor.shutdownNow();
-                               }
-                       } catch (InterruptedException e) {
-                               executor.shutdownNow();
-                       }
-               }
-       }
-
-       public ScopeFormats getScopeFormats() {
-               return scopeFormats;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Metrics (de)registration
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Registers a new {@link Metric} with this registry.
-        *
-        * @param metric      the metric that was added
-        * @param metricName  the name of the metric
-        * @param group       the group that contains the metric
-        */
-       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               try {
-                       if (reporter != null) {
-                               reporter.notifyOfAddedMetric(metric, 
metricName, group);
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while registering metric.", e);
-               }
-       }
-
-       /**
-        * Un-registers the given {@link org.apache.flink.metrics.Metric} with 
this registry.
-        *
-        * @param metric      the metric that should be removed
-        * @param metricName  the name of the metric
-        * @param group       the group that contains the metric
-        */
-       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               try {
-                       if (reporter != null) {
-                               reporter.notifyOfRemovedMetric(metric, 
metricName, group);
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while registering metric.", e);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-       
-       static Configuration createReporterConfig(Configuration config, 
TimeUnit timeunit, long period) {
-               Configuration reporterConfig = new Configuration();
-               reporterConfig.setLong("period", period);
-               reporterConfig.setString("timeunit", timeunit.name());
-
-               String[] arguments = 
config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" ");
-               if (arguments.length > 1) {
-                       for (int x = 0; x < arguments.length; x += 2) {
-                               
reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]);
-                       }
-               }
-               return reporterConfig;
-       }
-
-       static ScopeFormats createScopeConfig(Configuration config) {
-               String jmFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-               String jmJobFormat = config.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-               String tmFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               String tmJobFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-               String taskFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-               String operatorFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
-               
-               return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
-        * MetricsRegistry instance.
-        * 
-        * This is a subtle difference, but very important: With this static 
class, the enclosing class instance
-        * may become garbage-collectible, whereas with an anonymous inner 
class, the timer thread
-        * (which is a GC root) will hold a reference via the timer task and 
its enclosing instance pointer.
-        * Making the MetricsRegistry garbage collectible makes the 
java.util.Timer garbage collectible,
-        * which acts as a fail-safe to stop the timer thread and prevents 
resource leaks.
-        */
-       private static final class ReporterTask extends TimerTask {
-
-               private final Scheduled reporter;
-
-               private ReporterTask(Scheduled reporter) {
-                       this.reporter = reporter;
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               reporter.report();
-                       } catch (Throwable t) {
-                               LOG.warn("Error while reporting metrics", t);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
deleted file mode 100644
index 9720b08..0000000
--- a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics;
-
-/**
- * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not 
thread-safe.
- */
-public class SimpleCounter implements Counter {
-       private long count;
-
-       /**
-        * Increment the current count by 1.
-        */
-       @Override
-       public void inc() {
-               count++;
-       }
-
-       /**
-        * Increment the current count by the given value.
-        *
-        * @param n value to increment the current count by
-        */
-       @Override
-       public void inc(long n) {
-               count += n;
-       }
-
-       /**
-        * Decrement the current count by 1.
-        */
-       @Override
-       public void dec() {
-               count--;
-       }
-
-       /**
-        * Decrement the current count by the given value.
-        *
-        * @param n value to decrement the current count by
-        */
-       @Override
-       public void dec(long n) {
-               count -= n;
-       }
-
-       /**
-        * Returns the current count.
-        *
-        * @return current count
-        */
-       @Override
-       public long getCount() {
-               return count;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
deleted file mode 100644
index dda6e4d..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.CharacterFilter;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.SimpleCounter;
-
-import org.apache.flink.metrics.groups.scope.ScopeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Abstract {@link MetricGroup} that contains key functionality for adding 
metrics and groups.
- * 
- * <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
- * 
- * <p>This class uses locks for adding and removing metrics objects. This is 
done to
- * prevent resource leaks in the presence of concurrently closing a group and 
adding
- * metrics and subgroups.
- * Since closing groups recursively closes the subgroups, the lock acquisition 
order must
- * be strictly from parent group to subgroup. If at any point, a subgroup 
holds its group
- * lock and calls a parent method that also acquires the lock, it will create 
a deadlock
- * condition.
- *
- * <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the 
group de-register all metrics
- * from any metrics reporter and any internal maps. Note that even closed 
metrics groups
- * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
- * These metrics simply do not get reported any more, when created on a closed 
group.
- */
-@Internal
-public abstract class AbstractMetricGroup implements MetricGroup {
-
-       /** shared logger */
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricGroup.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** The registry that this metrics group belongs to */
-       protected final MetricRegistry registry;
-
-       /** All metrics that are directly contained in this group */
-       private final Map<String, Metric> metrics = new HashMap<>();
-
-       /** All metric subgroups of this group */
-       private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
-
-       /** The metrics scope represented by this group.
-        *  For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ]. */
-       private final String[] scopeComponents;
-
-       /** The metrics scope represented by this group, as a concatenated 
string, lazily computed.
-        * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
-       private String scopeString;
-
-       /** Flag indicating whether this group has been closed */
-       private volatile boolean closed;
-
-       // 
------------------------------------------------------------------------
-
-       public AbstractMetricGroup(MetricRegistry registry, String[] scope) {
-               this.registry = checkNotNull(registry);
-               this.scopeComponents = checkNotNull(scope);
-       }
-
-       /**
-        * Gets the scope as an array of the scope components, for example
-        * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
-        */
-       public String[] getScopeComponents() {
-               return scopeComponents;
-       }
-
-       /**
-        * Returns the fully qualified metric name, for example
-        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-        * 
-        * @param metricName metric name
-        * @return fully qualified metric name
-     */
-       public String getMetricIdentifier(String metricName) {
-               return getMetricIdentifier(metricName, null);
-       }
-
-       /**
-        * Returns the fully qualified metric name, for example
-        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-        *
-        * @param metricName metric name
-        * @param filter character filter which is applied to the scope 
components if not null.
-        * @return fully qualified metric name
-        */
-       public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
-               if (scopeString == null) {
-                       if (filter != null) {
-                               scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(), scopeComponents);
-                       } else {
-                               scopeString = 
ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
-                       }
-               }
-
-               if (filter != null) {
-                       return scopeString + registry.getDelimiter() + 
filter.filterCharacters(metricName);
-               } else {
-                       return scopeString + registry.getDelimiter() + 
metricName;
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Closing
-       // 
------------------------------------------------------------------------
-
-       public void close() {
-               synchronized (this) {
-                       if (!closed) {
-                               closed = true;
-
-                               // close all subgroups
-                               for (AbstractMetricGroup group : 
groups.values()) {
-                                       group.close();
-                               }
-                               groups.clear();
-
-                               // un-register all directly contained metrics
-                               for (Map.Entry<String, Metric> metric : 
metrics.entrySet()) {
-                                       registry.unregister(metric.getValue(), 
metric.getKey(), this);
-                               }
-                               metrics.clear();
-                       }
-               }
-       }
-
-       public final boolean isClosed() {
-               return closed;
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //  Metrics
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       @Override
-       public Counter counter(int name) {
-               return counter(String.valueOf(name));
-       }
-
-       @Override
-       public Counter counter(String name) {
-               return counter(name, new SimpleCounter());
-       }
-       
-       @Override
-       public <C extends Counter> C counter(int name, C counter) {
-               return counter(String.valueOf(name), counter);
-       }
-
-       @Override
-       public <C extends Counter> C counter(String name, C counter) {
-               addMetric(name, counter);
-               return counter;
-       }
-
-       @Override
-       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
-               return gauge(String.valueOf(name), gauge);
-       }
-
-       @Override
-       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
-               addMetric(name, gauge);
-               return gauge;
-       }
-
-       @Override
-       public <H extends Histogram> H histogram(int name, H histogram) {
-               return histogram(String.valueOf(name), histogram);
-       }
-
-       @Override
-       public <H extends Histogram> H histogram(String name, H histogram) {
-               addMetric(name, histogram);
-               return histogram;
-       }
-
-       /**
-        * Adds the given metric to the group and registers it at the registry, 
if the group
-        * is not yet closed, and if no metric with the same name has been 
registered before.
-        * 
-        * @param name the name to register the metric under
-        * @param metric the metric to register
-        */
-       protected void addMetric(String name, Metric metric) {
-               // add the metric only if the group is still open
-               synchronized (this) {
-                       if (!closed) {
-                               // immediately put without a 'contains' check 
to optimize the common case (no collition)
-                               // collisions are resolved later
-                               Metric prior = metrics.put(name, metric);
-
-                               // check for collisions with other metric names
-                               if (prior == null) {
-                                       // no other metric with this name yet
-
-                                       if (groups.containsKey(name)) {
-                                               // we warn here, rather than 
failing, because metrics are tools that should not fail the
-                                               // program when used incorrectly
-                                               LOG.warn("Name collision: 
Adding a metric with the same name as a metric subgroup: '" +
-                                                               name + "'. 
Metric might not get properly reported. (" + scopeString + ')');
-                                       }
-
-                                       registry.register(metric, name, this);
-                               }
-                               else {
-                                       // we had a collision. put back the 
original value
-                                       metrics.put(name, prior);
-                                       
-                                       // we warn here, rather than failing, 
because metrics are tools that should not fail the
-                                       // program when used incorrectly
-                                       LOG.warn("Name collision: Group already 
contains a Metric with the name '" +
-                                                       name + "'. Metric will 
not be reported. (" + scopeString + ')');
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Groups
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public MetricGroup addGroup(int name) {
-               return addGroup(String.valueOf(name));
-       }
-
-       @Override
-       public MetricGroup addGroup(String name) {
-               synchronized (this) {
-                       if (!closed) {
-                               // adding a group with the same name as a 
metric creates problems in many reporters/dashboards
-                               // we warn here, rather than failing, because 
metrics are tools that should not fail the
-                               // program when used incorrectly
-                               if (metrics.containsKey(name)) {
-                                       LOG.warn("Name collision: Adding a 
metric subgroup with the same name as an existing metric: '" +
-                                                       name + "'. Metric might 
not get properly reported. (" + scopeString + ')');
-                               }
-
-                               AbstractMetricGroup newGroup = new 
GenericMetricGroup(registry, this, name);
-                               AbstractMetricGroup prior = groups.put(name, 
newGroup);
-                               if (prior == null) {
-                                       // no prior group with that name
-                                       return newGroup;
-                               } else {
-                                       // had a prior group with that name, 
add the prior group back
-                                       groups.put(name, prior);
-                                       return prior;
-                               }
-                       }
-                       else {
-                               // return a non-registered group that is 
immediately closed already
-                               GenericMetricGroup closedGroup = new 
GenericMetricGroup(registry, this, name);
-                               closedGroup.close();
-                               return closedGroup;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
deleted file mode 100644
index 518d940..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-
-/**
- * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components 
(e.g., 
- * TaskManager, Job, Task, Operator).
- * 
- * <p>Usually, the scope of metrics is simply the hierarchy of the containing 
groups. For example
- * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code 
"A"} would have a
- * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the 
Metric's scope.
- *
- * <p>Component groups, however, have configurable scopes. This allow users to 
include or exclude
- * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
- * group could for example include the task attempt number (more fine grained 
identification), or
- * exclude it (for continuity of the namespace across failure and recovery).
- */
-@Internal
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
-
-       /**
-        * Creates a new ComponentMetricGroup.
-        *
-        * @param registry     registry to register new metrics with
-        * @param scope        the scope of the group
-        */
-       public ComponentMetricGroup(
-                       MetricRegistry registry,
-                       String[] scope) {
-
-               super(registry, scope);
-       }
-
-       /**
-        * Closes the component group by removing and closing all metrics and 
subgroups
-        * (inherited from {@link AbstractMetricGroup}), plus closing and 
removing all dedicated
-        * component subgroups.
-        */
-       @Override
-       public void close() {
-               synchronized (this) {
-                       if (!isClosed()) {
-                               // remove all metrics and generic subgroups
-                               super.close();
-
-                               // remove and close all subcomponent metrics
-                               for (ComponentMetricGroup group : 
subComponents()) {
-                                       group.close();
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  sub components
-       // 
------------------------------------------------------------------------
-
-       protected abstract Iterable<? extends ComponentMetricGroup> 
subComponents();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
deleted file mode 100644
index ddcd73b..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-
-/**
- * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to 
hold
- * subgroups of metrics.
- */
-@Internal
-public class GenericMetricGroup extends AbstractMetricGroup {
-
-       public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {
-               super(registry, makeScopeComponents(parent, name));
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static String[] makeScopeComponents(AbstractMetricGroup parent, 
String name) {
-               if (parent != null) {
-                       String[] parentComponents = parent.getScopeComponents();
-                       if (parentComponents != null && parentComponents.length 
> 0) {
-                               String[] parts = new 
String[parentComponents.length + 1];
-                               System.arraycopy(parentComponents, 0, parts, 0, 
parentComponents.length);
-                               parts[parts.length - 1] = name;
-                               return parts;
-                       }
-               }
-               return new String[] { name };
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
deleted file mode 100644
index 90bc2a8..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.metrics.Counter;
-
-/**
- * Metric group that contains shareable pre-defined IO-related metrics. The 
metrics registration is
- * forwarded to the parent task metric group.
- */
-public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
-
-       private final Counter numBytesOut;
-       private final Counter numBytesInLocal;
-       private final Counter numBytesInRemote;
-
-       public IOMetricGroup(TaskMetricGroup parent) {
-               super(parent);
-
-               this.numBytesOut = counter("numBytesOut");
-               this.numBytesInLocal = counter("numBytesInLocal");
-               this.numBytesInRemote = counter("numBytesInRemote");
-       }
-
-       public Counter getBytesOutCounter() {
-               return numBytesOut;
-       }
-
-       public Counter getNumBytesInLocalCounter() {
-               return numBytesInLocal;
-       }
-
-       public Counter getNumBytesInRemoteCounter() {
-               return numBytesInRemote;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
deleted file mode 100644
index 1dd0439..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
- * a specific job, running on the JobManager.
- */
-@Internal
-public class JobManagerJobMetricGroup extends JobMetricGroup {
-
-       /** The metrics group that contains this group */
-       private final JobManagerMetricGroup parent;
-
-       public JobManagerJobMetricGroup(
-               MetricRegistry registry,
-               JobManagerMetricGroup parent,
-               JobID jobId,
-               @Nullable String jobName) {
-
-               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
-       }
-
-       public JobManagerJobMetricGroup(
-               MetricRegistry registry,
-               JobManagerMetricGroup parent,
-               JobManagerJobScopeFormat scopeFormat,
-               JobID jobId,
-               @Nullable String jobName) {
-
-               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-               this.parent = checkNotNull(parent);
-       }
-
-       public final JobManagerMetricGroup parent() {
-               return parent;
-       }
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return Collections.emptyList();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
deleted file mode 100644
index 67e1117..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
JobManager.
- *
- * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do
- * not contain tasks any more
- */
-@Internal
-public class JobManagerMetricGroup extends ComponentMetricGroup {
-
-       private final Map<JobID, JobManagerJobMetricGroup> jobs = new 
HashMap<>();
-
-       private final String hostname;
-
-       public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
-               this(registry, 
registry.getScopeFormats().getJobManagerFormat(), hostname);
-       }
-
-       public JobManagerMetricGroup(
-               MetricRegistry registry,
-               JobManagerScopeFormat scopeFormat,
-               String hostname) {
-
-               super(registry, scopeFormat.formatScope(hostname));
-               this.hostname = hostname;
-       }
-
-       public String hostname() {
-               return hostname;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  job groups
-       // 
------------------------------------------------------------------------
-
-       public JobManagerJobMetricGroup addJob(
-               JobID jobId,
-               String jobName) {
-               // get or create a jobs metric group
-               JobManagerJobMetricGroup currentJobGroup;
-               synchronized (this) {
-                       if (!isClosed()) {
-                               currentJobGroup = jobs.get(jobId);
-
-                               if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
-                                       currentJobGroup = new 
JobManagerJobMetricGroup(registry, this, jobId, jobName);
-                                       jobs.put(jobId, currentJobGroup);
-                               }
-                               return currentJobGroup;
-                       } else {
-                               return null;
-                       }
-               }
-       }
-
-       public void removeJob(JobID jobId) {
-               if (jobId == null) {
-                       return;
-               }
-
-               synchronized (this) {
-                       JobManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
-                       if (containedGroup != null) {
-                               containedGroup.close();
-                       }
-               }
-       }
-
-       public int numRegisteredJobMetricGroups() {
-               return jobs.size();
-       }
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return jobs.values();
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
deleted file mode 100644
index f7dfc78..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-
-import javax.annotation.Nullable;
-
-/**
- * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
- * a specific job.
- */
-@Internal
-public abstract class JobMetricGroup extends ComponentMetricGroup {
-
-       /** The ID of the job represented by this metrics group */
-       protected final JobID jobId;
-
-       /** The name of the job represented by this metrics group */
-       @Nullable
-       protected final String jobName;
-
-       // 
------------------------------------------------------------------------
-
-       protected JobMetricGroup(
-                       MetricRegistry registry,
-                       JobID jobId,
-                       @Nullable String jobName,
-                       String[] scope) {
-               super(registry, scope);
-               
-               this.jobId = jobId;
-               this.jobName = jobName;
-       }
-
-       public JobID jobId() {
-               return jobId;
-       }
-
-       @Nullable
-       public String jobName() {
-               return jobName;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
deleted file mode 100644
index 6db79ab..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat;
-
-import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator.
- */
-@Internal
-public class OperatorMetricGroup extends ComponentMetricGroup {
-
-       /** The task metric group that contains this operator metric groups */
-       private final TaskMetricGroup parent;
-
-       public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup 
parent, String operatorName) {
-               this(registry, parent, 
registry.getScopeFormats().getOperatorFormat(), operatorName);
-       }
-
-       public OperatorMetricGroup(
-                       MetricRegistry registry,
-                       TaskMetricGroup parent,
-                       OperatorScopeFormat scopeFormat,
-                       String operatorName) {
-
-               super(registry, scopeFormat.formatScope(parent, operatorName));
-               this.parent = checkNotNull(parent);
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public final TaskMetricGroup parent() {
-               return parent;
-       }
-       
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return Collections.emptyList();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
deleted file mode 100644
index 14ff367..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Metric group which forwards all registration calls to its parent metric 
group.
- *
- * @param <P> Type of the parent metric group
- */
-@Internal
-public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
-       private final P parentMetricGroup;
-
-       public ProxyMetricGroup(P parentMetricGroup) {
-               this.parentMetricGroup = 
Preconditions.checkNotNull(parentMetricGroup);
-       }
-
-       @Override
-       public final Counter counter(int name) {
-               return parentMetricGroup.counter(name);
-       }
-
-       @Override
-       public final Counter counter(String name) {
-               return parentMetricGroup.counter(name);
-       }
-
-       @Override
-       public final <C extends Counter> C counter(int name, C counter) {
-               return parentMetricGroup.counter(name, counter);
-       }
-
-       @Override
-       public final <C extends Counter> C counter(String name, C counter) {
-               return parentMetricGroup.counter(name, counter);
-       }
-
-       @Override
-       public final <T, G extends Gauge<T>> G gauge(int name, G gauge) {
-               return parentMetricGroup.gauge(name, gauge);
-       }
-
-       @Override
-       public final <T, G extends Gauge<T>> G gauge(String name, G gauge) {
-               return parentMetricGroup.gauge(name, gauge);
-       }
-
-       @Override
-       public final <H extends Histogram> H histogram(String name, H 
histogram) {
-               return parentMetricGroup.histogram(name, histogram);
-       }
-
-       @Override
-       public final <H extends Histogram> H histogram(int name, H histogram) {
-               return parentMetricGroup.histogram(name, histogram);
-       }
-
-       @Override
-       public final MetricGroup addGroup(int name) {
-               return parentMetricGroup.addGroup(name);
-       }
-
-       @Override
-       public final MetricGroup addGroup(String name) {
-               return parentMetricGroup.addGroup(name);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
deleted file mode 100644
index fdaf1de..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
- * a specific job, running on the TaskManager.
- *
- * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
- */
-@Internal
-public class TaskManagerJobMetricGroup extends JobMetricGroup {
-
-       /** The metrics group that contains this group */
-       private final TaskManagerMetricGroup parent;
-
-       /** Map from execution attempt ID (task identifier) to task metrics */
-       private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
-
-       // 
------------------------------------------------------------------------
-
-       public TaskManagerJobMetricGroup(
-               MetricRegistry registry,
-               TaskManagerMetricGroup parent,
-               JobID jobId,
-               @Nullable String jobName) {
-
-               this(registry, checkNotNull(parent), 
registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName);
-       }
-
-       public TaskManagerJobMetricGroup(
-               MetricRegistry registry,
-               TaskManagerMetricGroup parent,
-               TaskManagerJobScopeFormat scopeFormat,
-               JobID jobId,
-               @Nullable String jobName) {
-
-               super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-               this.parent = checkNotNull(parent);
-       }
-
-       public final TaskManagerMetricGroup parent() {
-               return parent;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  adding / removing tasks
-       // 
------------------------------------------------------------------------
-
-       public TaskMetricGroup addTask(
-               AbstractID vertexId,
-               AbstractID executionId,
-               String taskName,
-               int subtaskIndex,
-               int attemptNumber) {
-
-               checkNotNull(executionId);
-
-               synchronized (this) {
-                       if (!isClosed()) {
-                               TaskMetricGroup task = new 
TaskMetricGroup(registry, this,
-                                       vertexId, executionId, taskName, 
subtaskIndex, attemptNumber);
-                               tasks.put(executionId, task);
-                               return task;
-                       } else {
-                               return null;
-                       }
-               }
-       }
-
-       public void removeTaskMetricGroup(AbstractID executionId) {
-               checkNotNull(executionId);
-
-               boolean removeFromParent = false;
-               synchronized (this) {
-                       if (!isClosed() && tasks.remove(executionId) != null && 
tasks.isEmpty()) {
-                               // this call removed the last task. close this 
group.
-                               removeFromParent = true;
-                               close();
-                       }
-               }
-
-               // IMPORTANT: removing from the parent must not happen while 
holding the this group's lock,
-               //      because it would violate the "first parent then 
subgroup" lock acquisition order
-               if (removeFromParent) {
-                       parent.removeJobMetricsGroup(jobId, this);
-               }
-       }
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return tasks.values();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
deleted file mode 100644
index 2b2b201..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.MetricRegistry;
-import 
org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a 
TaskManager.
- *
- * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do
- * not contain tasks any more
- */
-@Internal
-public class TaskManagerMetricGroup extends ComponentMetricGroup {
-
-       private final Map<JobID, TaskManagerJobMetricGroup> jobs = new 
HashMap<>();
-
-       private final String hostname;
-
-       private final String taskManagerId;
-
-
-       public TaskManagerMetricGroup(MetricRegistry registry, String hostname, 
String taskManagerId) {
-               this(registry, 
registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId);
-       }
-
-       public TaskManagerMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerScopeFormat scopeFormat,
-                       String hostname, String taskManagerId) {
-
-               super(registry, scopeFormat.formatScope(hostname, 
taskManagerId));
-               this.hostname = hostname;
-               this.taskManagerId = taskManagerId;
-       }
-
-       public String hostname() {
-               return hostname;
-       }
-
-       public String taskManagerId() {
-               return taskManagerId;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  job groups
-       // 
------------------------------------------------------------------------
-
-       public TaskMetricGroup addTaskForJob(
-                       JobID jobId,
-                       String jobName,
-                       AbstractID vertexID,
-                       AbstractID executionId,
-                       String taskName,
-                       int subtaskIndex,
-                       int attemptNumber) {
-
-               // we cannot strictly lock both our map modification and the 
job group modification
-               // because it might lead to a deadlock
-               while (true) {
-                       // get or create a jobs metric group
-                       TaskManagerJobMetricGroup currentJobGroup;
-                       synchronized (this) {
-                               currentJobGroup = jobs.get(jobId);
-
-                               if (currentJobGroup == null || 
currentJobGroup.isClosed()) {
-                                       currentJobGroup = new 
TaskManagerJobMetricGroup(registry, this, jobId, jobName);
-                                       jobs.put(jobId, currentJobGroup);
-                               }
-                       }
-
-                       // try to add another task. this may fail if we found a 
pre-existing job metrics
-                       // group and it is closed concurrently
-                       TaskMetricGroup taskGroup = currentJobGroup.addTask(
-                                       vertexID, executionId, taskName, 
subtaskIndex, attemptNumber);
-
-                       if (taskGroup != null) {
-                               // successfully added the next task
-                               return taskGroup;
-                       }
-
-                       // else fall through the loop
-               }
-       }
-
-       public void removeJobMetricsGroup(JobID jobId, 
TaskManagerJobMetricGroup group) {
-               if (jobId == null || group == null || !group.isClosed()) {
-                       return;
-               }
-
-               synchronized (this) {
-                       // optimistically remove the currently contained group, 
and check later if it was correct
-                       TaskManagerJobMetricGroup containedGroup = 
jobs.remove(jobId);
-
-                       // check if another group was actually contained, and 
restore that one
-                       if (containedGroup != null && containedGroup != group) {
-                               jobs.put(jobId, containedGroup);
-                       }
-               }
-       }
-
-       public int numRegisteredJobMetricGroups() {
-               return jobs.size();
-       }
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return jobs.values();
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
deleted file mode 100644
index 5849b4b..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.metrics.groups;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.metrics.MetricRegistry;
-import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat;
-import org.apache.flink.util.AbstractID;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink 
runtime Task.
- * 
- * <p>Contains extra logic for adding operators.
- */
-@Internal
-public class TaskMetricGroup extends ComponentMetricGroup {
-
-       /** The job metrics group containing this task metrics group */
-       private final TaskManagerJobMetricGroup parent;
-
-       private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
-
-       private final IOMetricGroup ioMetrics;
-       
-       /** The execution Id uniquely identifying the executed task represented 
by this metrics group */
-       private final AbstractID executionId;
-
-       @Nullable
-       private final AbstractID vertexId;
-       
-       @Nullable
-       private final String taskName;
-
-       private final int subtaskIndex;
-
-       private final int attemptNumber;
-
-       // 
------------------------------------------------------------------------
-
-       public TaskMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerJobMetricGroup parent,
-                       @Nullable AbstractID vertexId,
-                       AbstractID executionId,
-                       @Nullable String taskName,
-                       int subtaskIndex,
-                       int attemptNumber) {
-               
-               this(registry, parent, 
registry.getScopeFormats().getTaskFormat(),
-                               vertexId, executionId, taskName, subtaskIndex, 
attemptNumber);
-       }
-
-       public TaskMetricGroup(
-                       MetricRegistry registry,
-                       TaskManagerJobMetricGroup parent,
-                       TaskScopeFormat scopeFormat, 
-                       @Nullable AbstractID vertexId,
-                       AbstractID executionId,
-                       @Nullable String taskName,
-                       int subtaskIndex,
-                       int attemptNumber) {
-
-               super(registry, scopeFormat.formatScope(
-                               parent, vertexId, executionId, taskName, 
subtaskIndex, attemptNumber));
-
-               this.parent = checkNotNull(parent);
-               this.executionId = checkNotNull(executionId);
-               this.vertexId = vertexId;
-               this.taskName = taskName;
-               this.subtaskIndex = subtaskIndex;
-               this.attemptNumber = attemptNumber;
-
-               this.ioMetrics = new IOMetricGroup(this);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  properties
-       // 
------------------------------------------------------------------------
-
-       public final TaskManagerJobMetricGroup parent() {
-               return parent;
-       }
-
-       public AbstractID executionId() {
-               return executionId;
-       }
-
-       @Nullable
-       public AbstractID vertexId() {
-               return vertexId;
-       }
-
-       @Nullable
-       public String taskName() {
-               return taskName;
-       }
-
-       public int subtaskIndex() {
-               return subtaskIndex;
-       }
-
-       public int attemptNumber() {
-               return attemptNumber;
-       }
-
-       /**
-        * Returns the IOMetricGroup for this task.
-        *
-        * @return IOMetricGroup for this task.
-        */
-       public IOMetricGroup getIOMetricGroup() {
-               return ioMetrics;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  operators and cleanup
-       // 
------------------------------------------------------------------------
-       public OperatorMetricGroup addOperator(String name) {
-               OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, name);
-
-               synchronized (this) {
-                       OperatorMetricGroup previous = operators.put(name, 
operator);
-                       if (previous == null) {
-                               // no operator group so far
-                               return operator;
-                       } else {
-                               // already had an operator group. restore that 
one.
-                               operators.put(name, previous);
-                               return previous;
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-               super.close();
-
-               parent.removeTaskMetricGroup(executionId);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       protected Iterable<? extends ComponentMetricGroup> subComponents() {
-               return operators.values();
-       }
-}

Reply via email to