[FLINK-6793] Activate checkstyle for runtime/metrics

This closes #4037.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b3d284b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b3d284b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b3d284b

Branch: refs/heads/master
Commit: 2b3d284bf09a30edafa4dadf50492156bb47027b
Parents: cd5b4a6
Author: zentol <[email protected]>
Authored: Wed May 31 16:39:52 2017 +0200
Committer: zentol <[email protected]>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../flink/runtime/metrics/MetricNames.java      |  4 ++
 .../flink/runtime/metrics/MetricRegistry.java   | 22 ++++---
 .../metrics/MetricRegistryConfiguration.java    | 11 ++--
 .../flink/runtime/metrics/ViewUpdater.java      |  1 +
 .../flink/runtime/metrics/dump/MetricDump.java  |  4 +-
 .../metrics/dump/MetricDumpSerialization.java   | 31 ++++++----
 .../metrics/dump/MetricQueryService.java        | 16 ++---
 .../runtime/metrics/dump/QueryScopeInfo.java    |  5 +-
 .../metrics/groups/AbstractMetricGroup.java     | 52 ++++++++--------
 .../metrics/groups/ComponentMetricGroup.java    |  8 +--
 .../metrics/groups/GenericMetricGroup.java      |  2 +-
 .../groups/JobManagerJobMetricGroup.java        |  2 +
 .../metrics/groups/JobManagerMetricGroup.java   |  1 +
 .../runtime/metrics/groups/JobMetricGroup.java  |  9 +--
 .../metrics/groups/OperatorIOMetricGroup.java   |  3 +-
 .../metrics/groups/OperatorMetricGroup.java     |  4 +-
 .../metrics/groups/TaskIOMetricGroup.java       |  4 +-
 .../groups/TaskManagerJobMetricGroup.java       |  4 +-
 .../metrics/groups/TaskManagerMetricGroup.java  |  1 -
 .../runtime/metrics/groups/TaskMetricGroup.java |  9 +--
 .../runtime/metrics/scope/ScopeFormat.java      | 18 +++---
 .../runtime/metrics/scope/ScopeFormats.java     | 10 ++-
 .../flink/runtime/metrics/util/MetricUtils.java |  7 ++-
 .../runtime/metrics/MetricRegistryTest.java     | 65 +++++++++++++++-----
 .../runtime/metrics/TaskManagerMetricsTest.java | 23 ++++---
 .../metrics/dump/MetricDumpSerializerTest.java  | 22 ++++---
 .../runtime/metrics/dump/MetricDumpTest.java    |  4 ++
 .../metrics/dump/MetricQueryServiceTest.java    | 15 +++--
 .../metrics/dump/QueryScopeInfoTest.java        |  4 ++
 .../metrics/groups/AbstractMetricGroupTest.java | 16 ++++-
 .../metrics/groups/JobManagerGroupTest.java     |  5 ++
 .../metrics/groups/JobManagerJobGroupTest.java  |  4 ++
 .../groups/MetricGroupRegistrationTest.java     | 13 +++-
 .../runtime/metrics/groups/MetricGroupTest.java | 38 +++++++-----
 .../metrics/groups/OperatorGroupTest.java       |  6 +-
 .../metrics/groups/QueryScopeInfoTest.java      |  9 ++-
 .../metrics/groups/TaskIOMetricGroupTest.java   |  7 ++-
 .../metrics/groups/TaskManagerGroupTest.java    | 24 ++++----
 .../metrics/groups/TaskManagerJobGroupTest.java |  5 +-
 .../metrics/groups/TaskMetricGroupTest.java     | 10 ++-
 .../metrics/util/DummyCharacterFilter.java      |  4 ++
 .../runtime/metrics/util/TestReporter.java      |  3 +
 .../runtime/metrics/util/TestingHistogram.java  |  4 ++
 44 files changed, 331 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 2272bc7..d83d671 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -451,7 +451,6 @@ under the License.
                                                **/runtime/leaderretrieval/**,
                                                **/runtime/memory/**,
                                                **/runtime/messages/**,
-                                               **/runtime/metrics/**,
                                                **/runtime/minicluster/**,
                                                **/runtime/net/**,
                                                **/runtime/operators/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 9202ca1..300b4b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -15,8 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics;
 
+/**
+ * Collection of metric names.
+ */
 public class MetricNames {
        private MetricNames() {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9f46d47..5018cbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -38,11 +34,13 @@ import 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -53,6 +51,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
  * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
@@ -154,7 +156,7 @@ public class MetricRegistry {
 
        /**
         * Initializes the MetricQueryService.
-        * 
+        *
         * @param actorSystem ActorSystem to create the MetricQueryService on
         * @param resourceID resource ID used to disambiguate the actor name
      */
@@ -250,7 +252,7 @@ public class MetricRegistry {
                        }
                }
        }
-       
+
        private void shutdownExecutor() {
                if (executor != null) {
                        executor.shutdown();
@@ -361,7 +363,7 @@ public class MetricRegistry {
         * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
         * MetricsRegistry instance.
         *
-        * This is a subtle difference, but very important: With this static 
class, the enclosing class instance
+        * <p>This is a subtle difference, but very important: With this static 
class, the enclosing class instance
         * may become garbage-collectible, whereas with an anonymous inner 
class, the timer thread
         * (which is a GC root) will hold a reference via the timer task and 
its enclosing instance pointer.
         * Making the MetricsRegistry garbage collectible makes the 
java.util.Timer garbage collectible,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 3475f04..6f6db86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,7 @@ public class MetricRegistryConfiguration {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricRegistryConfiguration.class);
 
-       private static volatile MetricRegistryConfiguration 
DEFAULT_CONFIGURATION;
+       private static volatile MetricRegistryConfiguration 
defaultConfiguration;
 
        // regex pattern to split the defined reporters
        private static final Pattern splitPattern = 
Pattern.compile("\\s*,\\s*");
@@ -148,15 +149,15 @@ public class MetricRegistryConfiguration {
 
        public static MetricRegistryConfiguration 
defaultMetricRegistryConfiguration() {
                // create the default metric registry configuration only once
-               if (DEFAULT_CONFIGURATION == null) {
+               if (defaultConfiguration == null) {
                        synchronized (MetricRegistryConfiguration.class) {
-                               if (DEFAULT_CONFIGURATION == null) {
-                                       DEFAULT_CONFIGURATION = 
fromConfiguration(new Configuration());
+                               if (defaultConfiguration == null) {
+                                       defaultConfiguration = 
fromConfiguration(new Configuration());
                                }
                        }
                }
 
-               return DEFAULT_CONFIGURATION;
+               return defaultConfiguration;
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
index e4d0596..77bd0d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.metrics.View;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
index 2239b50..202e453 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.util.Preconditions;
@@ -121,7 +122,7 @@ public abstract class MetricDump {
        }
 
        /**
-        * Container for the rate of a {@link org.apache.flink.metrics.Meter}. 
+        * Container for the rate of a {@link org.apache.flink.metrics.Meter}.
         */
        public static class MeterDump extends MetricDump {
                public final double rate;
@@ -130,6 +131,7 @@ public abstract class MetricDump {
                        super(scopeInfo, name);
                        this.rate = rate;
                }
+
                @Override
                public byte getCategory() {
                        return METRIC_CATEGORY_METER;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index e57a0d8..e173522 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,6 +28,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +58,14 @@ public class MetricDumpSerialization {
 
        /**
         * This class encapsulates all serialized metrics and a count for each 
metric type.
-        * 
-        * The counts are stored separately from the metrics since the final 
count for any given type can only be
+        *
+        * <p>The counts are stored separately from the metrics since the final 
count for any given type can only be
         * determined after all metrics of that type were serialized. Storing 
them together in a single byte[] would
         * require an additional copy of all serialized metrics, as you would 
first have to serialize the metrics into a
         * temporary buffer to calculate the counts, write the counts to the 
final output and copy all metrics from the
         * temporary buffer.
-        * 
-        * Note that while one could implement the serialization in such a way 
so that at least 1 byte (a validity flag)
+        *
+        * <p>Note that while one could implement the serialization in such a 
way so that at least 1 byte (a validity flag)
         * is written for each metric, this would require more bandwidth due to 
the sheer number of metrics.
         */
        public static class MetricSerializationResult implements Serializable {
@@ -75,12 +77,12 @@ public class MetricDumpSerialization {
                public final int numGauges;
                public final int numMeters;
                public final int numHistograms;
-               
+
                public MetricSerializationResult(byte[] serializedMetrics, int 
numCounters, int numGauges, int numMeters, int numHistograms) {
                        Preconditions.checkNotNull(serializedMetrics);
                        Preconditions.checkArgument(numCounters >= 0);
                        Preconditions.checkArgument(numGauges >= 0);
-                       Preconditions.checkArgument(numMeters >= 0); 
+                       Preconditions.checkArgument(numMeters >= 0);
                        Preconditions.checkArgument(numHistograms >= 0);
                        this.serializedMetrics = serializedMetrics;
                        this.numCounters = numCounters;
@@ -94,18 +96,21 @@ public class MetricDumpSerialization {
        // Serialization
        
//-------------------------------------------------------------------------
 
+       /**
+        * Serializes a set of metrics into a {@link MetricSerializationResult}.
+        */
        public static class MetricDumpSerializer {
 
                private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
 
                /**
                 * Serializes the given metrics and returns the resulting byte 
array.
-                * 
-                * Should a {@link Metric} accessed in this method throw an 
exception it will be omitted from the returned
+                *
+                * <p>Should a {@link Metric} accessed in this method throw an 
exception it will be omitted from the returned
                 * {@link MetricSerializationResult}.
-                * 
-                * If the serialization of any primitive or String fails then 
the returned {@link MetricSerializationResult}
-                * is partially corrupted. Such a result can be deserialized 
safely by 
+                *
+                * <p>If the serialization of any primitive or String fails 
then the returned {@link MetricSerializationResult}
+                * is partially corrupted. Such a result can be deserialized 
safely by
                 * {@link 
MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only 
metrics that were
                 * fully serialized before the failure will be returned.
                 *
@@ -263,6 +268,9 @@ public class MetricDumpSerialization {
        // Deserialization
        
//-------------------------------------------------------------------------
 
+       /**
+        * Deserializer for reading a list of {@link MetricDump MetricDumps} 
from a {@link MetricSerializationResult}.
+        */
        public static class MetricDumpDeserializer {
                /**
                 * De-serializes metrics from the given byte array and returns 
them as a list of {@link MetricDump}.
@@ -311,7 +319,6 @@ public class MetricDumpSerialization {
                }
        }
 
-
        private static MetricDump.CounterDump deserializeCounter(DataInputView 
dis) throws IOException {
                QueryScopeInfo scope = deserializeMetricInfo(dis);
                String name = dis.readUTF();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 2229139..8821e0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
@@ -31,6 +27,12 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +46,7 @@ import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
 /**
  * The MetricQueryService creates a key-value representation of all metrics 
currently registered with Flink when queried.
  *
- * It is realized as an actor and can be notified of
+ * <p>It is realized as an actor and can be notified of
  * - an added metric by calling {@link 
MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, 
AbstractMetricGroup)}
  * - a removed metric by calling {@link 
MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
  * - a metric dump request by sending the return value of {@link 
MetricQueryService#getCreateDump()}
@@ -217,6 +219,6 @@ public class MetricQueryService extends UntypedActor {
        }
 
        private static class CreateDump implements Serializable {
-               private static CreateDump INSTANCE = new CreateDump();
+               private static final CreateDump INSTANCE = new CreateDump();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index 6572ca0..9af9d78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 /**
@@ -28,7 +29,7 @@ public abstract class QueryScopeInfo {
        public static final byte INFO_CATEGORY_TASK = 3;
        public static final byte INFO_CATEGORY_OPERATOR = 4;
 
-       /** The remaining scope not covered by specific fields */
+       /** The remaining scope not covered by specific fields. */
        public final String scope;
 
        private QueryScopeInfo(String scope) {
@@ -45,7 +46,7 @@ public abstract class QueryScopeInfo {
 
        /**
         * Returns the category for this QueryScopeInfo.
-        * 
+        *
         * @return category
      */
        public abstract byte getCategory();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index a19970d..c67c5ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +42,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Abstract {@link MetricGroup} that contains key functionality for adding 
metrics and groups.
- * 
+ *
  * <p><b>IMPORTANT IMPLEMENTATION NOTE</b>
- * 
+ *
  * <p>This class uses locks for adding and removing metrics objects. This is 
done to
  * prevent resource leaks in the presence of concurrently closing a group and 
adding
  * metrics and subgroups.
@@ -56,30 +57,29 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * from any metrics reporter and any internal maps. Note that even closed 
metrics groups
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a closed 
group.
- * 
+ *
  * @param <A> The type of the parent MetricGroup
  */
 @Internal
 public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> 
implements MetricGroup {
 
-       /** shared logger */
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricGroup.class);
 
        // 
------------------------------------------------------------------------
 
-       /** The parent group containing this group */
+       /** The parent group containing this group. */
        protected final A parent;
 
        /** The map containing all variables and their associated values, 
lazily computed. */
        protected volatile Map<String, String> variables;
-       
-       /** The registry that this metrics group belongs to */
+
+       /** The registry that this metrics group belongs to. */
        protected final MetricRegistry registry;
 
-       /** All metrics that are directly contained in this group */
+       /** All metrics that are directly contained in this group. */
        private final Map<String, Metric> metrics = new HashMap<>();
 
-       /** All metric subgroups of this group */
+       /** All metric subgroups of this group. */
        private final Map<String, AbstractMetricGroup> groups = new HashMap<>();
 
        /** The metrics scope represented by this group.
@@ -97,7 +97,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        /** The metrics query service scope represented by this group, lazily 
computed. */
        protected QueryScopeInfo queryServiceScopeInfo;
 
-       /** Flag indicating whether this group has been closed */
+       /** Flag indicating whether this group has been closed. */
        private volatile boolean closed;
 
        // 
------------------------------------------------------------------------
@@ -111,7 +111,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        public Map<String, String> getAllVariables() {
                if (variables == null) { // avoid synchronization for common 
case
-                       synchronized(this) {
+                       synchronized (this) {
                                if (variables == null) {
                                        if (parent != null) {
                                                variables = 
parent.getAllVariables();
@@ -126,7 +126,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the logical scope of this group, for example
-        * {@code "taskmanager.job.task"}
+        * {@code "taskmanager.job.task"}.
         *
         * @param filter character filter which is applied to the scope 
components
         * @return logical scope
@@ -137,7 +137,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the logical scope of this group, for example
-        * {@code "taskmanager.job.task"}
+        * {@code "taskmanager.job.task"}.
         *
         * @param filter character filter which is applied to the scope 
components
         * @return logical scope
@@ -155,7 +155,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the name for this group, meaning what kind of entity it 
represents, for example "taskmanager".
-        * 
+        *
         * @param filter character filter which is applied to the name
         * @return logical name for this group
         */
@@ -163,9 +163,9 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Gets the scope as an array of the scope components, for example
-        * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
-        * 
-        * @see #getMetricIdentifier(String) 
+        * {@code ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper"]}.
+        *
+        * @see #getMetricIdentifier(String)
         */
        public String[] getScopeComponents() {
                return scopeComponents;
@@ -173,7 +173,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the metric query service scope for this group.
-        * 
+        *
         * @param filter character filter
         * @return query service scope
      */
@@ -194,8 +194,8 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the fully qualified metric name, for example
-        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
-        * 
+        * {@code 
"host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
+        *
         * @param metricName metric name
         * @return fully qualified metric name
         */
@@ -205,7 +205,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the fully qualified metric name, for example
-        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        * {@code 
"host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
         *
         * @param metricName metric name
         * @param filter character filter which is applied to the scope 
components if not null.
@@ -217,7 +217,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
 
        /**
         * Returns the fully qualified metric name using the configured 
delimiter for the reporter with the given index, for example
-        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        * {@code 
"host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
         *
         * @param metricName metric name
         * @param filter character filter which is applied to the scope 
components if not null.
@@ -250,7 +250,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                        return scopeStrings[reporterIndex] + delimiter + 
metricName;
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Closing
        // 
------------------------------------------------------------------------
@@ -292,7 +292,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        public Counter counter(String name) {
                return counter(name, new SimpleCounter());
        }
-       
+
        @Override
        public <C extends Counter> C counter(int name, C counter) {
                return counter(String.valueOf(name), counter);
@@ -340,7 +340,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        /**
         * Adds the given metric to the group and registers it at the registry, 
if the group
         * is not yet closed, and if no metric with the same name has been 
registered before.
-        * 
+        *
         * @param name the name to register the metric under
         * @param metric the metric to register
         */
@@ -372,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                                else {
                                        // we had a collision. put back the 
original value
                                        metrics.put(name, prior);
-                                       
+
                                        // we warn here, rather than failing, 
because metrics are tools that should not fail the
                                        // program when used incorrectly
                                        LOG.warn("Name collision: Group already 
contains a Metric with the name '" +

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
index 9f0f483..0cd9942 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
@@ -25,9 +25,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components 
(e.g., 
+ * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components 
(e.g.,
  * TaskManager, Job, Task, Operator).
- * 
+ *
  * <p>Usually, the scope of metrics is simply the hierarchy of the containing 
groups. For example
  * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code 
"A"} would have a
  * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the 
Metric's scope.
@@ -36,7 +36,7 @@ import java.util.Map;
  * certain identifiers from the scope. The scope for metrics belonging to the 
"Task"
  * group could for example include the task attempt number (more fine grained 
identification), or
  * exclude it (for continuity of the namespace across failure and recovery).
- * 
+ *
  * @param <P> The type of the parent MetricGroup.
  */
 @Internal
@@ -101,7 +101,7 @@ public abstract class ComponentMetricGroup<P extends 
AbstractMetricGroup<?>> ext
 
        /**
         * Gets all component metric groups that are contained in this 
component metric group.
-        * 
+        *
         * @return All component metric groups that are contained in this 
component metric group.
         */
        protected abstract Iterable<? extends ComponentMetricGroup> 
subComponents();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 5978f2d..ee7d1ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
  */
 @Internal
 public class GenericMetricGroup extends 
AbstractMetricGroup<AbstractMetricGroup<?>> {
-       /** The name of this group */
+       /** The name of this group. */
        private String name;
 
        public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index c4902e8..b62c7b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
@@ -22,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
 import javax.annotation.Nullable;
+
 import java.util.Collections;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index 5a35110..e09051d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index 17f6189..876e794 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -26,21 +26,22 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /**
  * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing 
everything belonging to
  * a specific job.
- * 
+ *
  * @param <C> The type of the parent ComponentMetricGroup.
  */
 @Internal
 public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> 
extends ComponentMetricGroup<C> {
 
-       /** The ID of the job represented by this metrics group */
+       /** The ID of the job represented by this metrics group. */
        protected final JobID jobId;
 
-       /** The name of the job represented by this metrics group */
+       /** The name of the job represented by this metrics group. */
        @Nullable
        protected final String jobName;
 
@@ -53,7 +54,7 @@ public abstract class JobMetricGroup<C extends 
ComponentMetricGroup<C>> extends
                        @Nullable String jobName,
                        String[] scope) {
                super(registry, scope, parent);
-               
+
                this.jobId = jobId;
                this.jobName = jobName;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
index 5bf7d1f..6bf3c6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
@@ -64,7 +65,7 @@ public class OperatorIOMetricGroup extends 
ProxyMetricGroup<OperatorMetricGroup>
        public void reuseInputMetricsForTask() {
                TaskIOMetricGroup taskIO = 
parentMetricGroup.parent().getIOMetricGroup();
                taskIO.reuseRecordsInputCounter(this.numRecordsIn);
-               
+
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 37c9dd8..2313873 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -46,7 +46,7 @@ public class OperatorMetricGroup extends 
ComponentMetricGroup<TaskMetricGroup> {
        }
 
        // 
------------------------------------------------------------------------
-       
+
        public final TaskMetricGroup parent() {
                return parent;
        }
@@ -68,7 +68,7 @@ public class OperatorMetricGroup extends 
ComponentMetricGroup<TaskMetricGroup> {
        public OperatorIOMetricGroup getIOMetricGroup() {
                return ioMetrics;
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Component Metric Group Specifics
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index 38accad..e12ecd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -24,11 +24,11 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -110,7 +110,7 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
        // 
============================================================================================
 
        /**
-        * Initialize Buffer Metrics for a task
+        * Initialize Buffer Metrics for a task.
         */
        public void initializeBufferMetrics(Task task) {
                final MetricGroup buffers = addGroup("buffers");

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 79a87d0..3921553 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,7 +41,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskManagerJobMetricGroup extends 
JobMetricGroup<TaskManagerMetricGroup> {
 
-       /** Map from execution attempt ID (task identifier) to task metrics */
+       /** Map from execution attempt ID (task identifier) to task metrics. */
        private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 92c509a..d85e868 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -46,7 +46,6 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup<TaskManagerMetr
 
        private final String taskManagerId;
 
-
        public TaskManagerMetricGroup(MetricRegistry registry, String hostname, 
String taskManagerId) {
                super(registry, 
registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, 
taskManagerId), null);
                this.hostname = hostname;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 43e8e1b..cb7aaa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,7 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink 
runtime Task.
- * 
+ *
  * <p>Contains extra logic for adding operators.
  */
 @Internal
@@ -42,13 +43,13 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
        private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
 
        private final TaskIOMetricGroup ioMetrics;
-       
-       /** The execution Id uniquely identifying the executed task represented 
by this metrics group */
+
+       /** The execution Id uniquely identifying the executed task represented 
by this metrics group. */
        private final AbstractID executionId;
 
        @Nullable
        protected final AbstractID vertexId;
-       
+
        @Nullable
        private final String taskName;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index f9efb88..18b3a0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -52,7 +52,7 @@ public abstract class ScopeFormat {
        /**
         * If the scope format starts with this character, then the parent 
components scope
         * format will be used as a prefix.
-        * 
+        *
         * <p>For example, if the TaskManager's job format is {@code 
"*.<job_name>"}, and the
         * TaskManager format is {@code "<host>"}, then the job's metrics
         * will have {@code "<host>.<job_name>"} as their scope.
@@ -90,16 +90,16 @@ public abstract class ScopeFormat {
        // ----- Operator ----
 
        public static final String SCOPE_OPERATOR_NAME = 
asVariable("operator_name");
-       
+
 
        // 
------------------------------------------------------------------------
        //  Scope Format Base
        // 
------------------------------------------------------------------------
 
-       /** The scope format */
+       /** The scope format. */
        private final String format;
 
-       /** The format, split into components */
+       /** The format, split into components. */
        private final String[] template;
 
        private final int[] templatePos;
@@ -125,7 +125,7 @@ public abstract class ScopeFormat {
 
                        String[] parentTemplate = parent.template;
                        int parentLen = parentTemplate.length;
-                       
+
                        this.template = new String[parentLen + 
rawComponents.length - 1];
                        System.arraycopy(parentTemplate, 0, this.template, 0, 
parentLen);
                        System.arraycopy(rawComponents, 1, this.template, 
parentLen, rawComponents.length - 1);
@@ -137,14 +137,14 @@ public abstract class ScopeFormat {
 
                // --- compute the replacement matrix ---
                // a bit of clumsy Java collections code ;-)
-               
+
                HashMap<String, Integer> varToValuePos = arrayToMap(variables);
                List<Integer> templatePos = new ArrayList<>();
                List<Integer> valuePos = new ArrayList<>();
 
                for (int i = 0; i < template.length; i++) {
                        final String component = template[i];
-                       
+
                        // check if that is a variable
                        if (component != null && component.length() >= 3 &&
                                        component.charAt(0) == '<' && 
component.charAt(component.length() - 1) == '>') {
@@ -188,7 +188,7 @@ public abstract class ScopeFormat {
        public String toString() {
                return "ScopeFormat '" + format + '\'';
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public abstract class ScopeFormat {
                }
                return sb.toString();
        }
-       
+
        protected static String valueOrNull(Object value) {
                return (value == null || (value instanceof String && ((String) 
value).isEmpty())) ?
                                "null" : value.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bde93be..dc49d32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -67,8 +67,7 @@ public class ScopeFormats {
                        String taskManagerFormat,
                        String taskManagerJobFormat,
                        String taskFormat,
-                       String operatorFormat)
-       {
+                       String operatorFormat) {
                this.jobManagerFormat = new 
JobManagerScopeFormat(jobManagerFormat);
                this.jobManagerJobFormat = new 
JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat);
                this.taskManagerFormat = new 
TaskManagerScopeFormat(taskManagerFormat);
@@ -86,8 +85,7 @@ public class ScopeFormats {
                        TaskManagerScopeFormat taskManagerFormat,
                        TaskManagerJobScopeFormat taskManagerJobFormat,
                        TaskScopeFormat taskFormat,
-                       OperatorScopeFormat operatorFormat)
-       {
+                       OperatorScopeFormat operatorFormat) {
                this.jobManagerFormat = checkNotNull(jobManagerFormat);
                this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat);
                this.taskManagerFormat = checkNotNull(taskManagerFormat);
@@ -129,8 +127,8 @@ public class ScopeFormats {
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates the scope formats as defined in the given configuration
-        * 
+        * Creates the scope formats as defined in the given configuration.
+        *
         * @param config The configuration that defines the formats
         * @return The ScopeFormats parsed from the configuration
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 4612eaf..2ecde42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.commons.lang3.text.WordUtils;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+
+import org.apache.commons.lang3.text.WordUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +37,9 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 
+/**
+ * Utility class to register pre-defined metric sets.
+ */
 public class MetricUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
        private static final String METRIC_GROUP_STATUS_NAME = "Status";

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index b9502b2..1de2551 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
@@ -34,20 +31,27 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.junit.Assert;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link MetricRegistry}.
+ */
 public class MetricRegistryTest extends TestLogger {
 
        private static final char GLOBAL_DEFAULT_DELIMITER = '.';
@@ -55,14 +59,14 @@ public class MetricRegistryTest extends TestLogger {
        @Test
        public void testIsShutdown() {
                MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-               
+
                Assert.assertFalse(metricRegistry.isShutdown());
-               
+
                metricRegistry.shutdown();
-               
+
                Assert.assertTrue(metricRegistry.isShutdown());
        }
-       
+
        /**
         * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
         */
@@ -82,6 +86,9 @@ public class MetricRegistryTest extends TestLogger {
                metricRegistry.shutdown();
        }
 
+       /**
+        * Reporter that exposes whether open() was called.
+        */
        protected static class TestReporter1 extends TestReporter {
                public static boolean wasOpened = false;
 
@@ -114,6 +121,9 @@ public class MetricRegistryTest extends TestLogger {
                metricRegistry.shutdown();
        }
 
+       /**
+        * Reporter that exposes whether open() was called.
+        */
        protected static class TestReporter11 extends TestReporter {
                public static boolean wasOpened = false;
 
@@ -123,6 +133,9 @@ public class MetricRegistryTest extends TestLogger {
                }
        }
 
+       /**
+        * Reporter that exposes whether open() was called.
+        */
        protected static class TestReporter12 extends TestReporter {
                public static boolean wasOpened = false;
 
@@ -132,6 +145,9 @@ public class MetricRegistryTest extends TestLogger {
                }
        }
 
+       /**
+        * Reporter that exposes whether open() was called.
+        */
        protected static class TestReporter13 extends TestReporter {
                public static boolean wasOpened = false;
 
@@ -156,6 +172,9 @@ public class MetricRegistryTest extends TestLogger {
                new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
        }
 
+       /**
+        * Reporter that verifies whether configured arguments were properly 
passed.
+        */
        protected static class TestReporter2 extends TestReporter {
                @Override
                public void open(MetricConfig config) {
@@ -186,9 +205,9 @@ public class MetricRegistryTest extends TestLogger {
                        int reportCount = TestReporter3.reportCount;
                        long curT = System.currentTimeMillis();
                        /**
-                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports. 
-                        * This value however does not not take the first 
triggered report into account (=> +1). 
-                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time 
+                        * Within a given time-frame T only T/500 reports may 
be triggered due to the interval between reports.
+                        * This value however does not not take the first 
triggered report into account (=> +1).
+                        * Furthermore we have to account for the mis-alignment 
between reports being triggered and our time
                         * measurement (=> +1); for T=200 a total of 4-6 
reports may have been
                         * triggered depending on whether the end of the 
interval for the first reports ends before
                         * or after T=50.
@@ -201,6 +220,9 @@ public class MetricRegistryTest extends TestLogger {
                registry.shutdown();
        }
 
+       /**
+        * Reporter that exposes how often report() was called.
+        */
        protected static class TestReporter3 extends TestReporter implements 
Scheduled {
                public static int reportCount = 0;
 
@@ -234,6 +256,9 @@ public class MetricRegistryTest extends TestLogger {
                registry.shutdown();
        }
 
+       /**
+        * Reporter that exposes whether it was notified of added or removed 
metrics.
+        */
        protected static class TestReporter6 extends TestReporter {
                public static boolean addCalled = false;
                public static boolean removeCalled = false;
@@ -253,6 +278,9 @@ public class MetricRegistryTest extends TestLogger {
                }
        }
 
+       /**
+        * Reporter that exposes whether it was notified of added or removed 
metrics.
+        */
        protected static class TestReporter7 extends TestReporter {
                public static boolean addCalled = false;
                public static boolean removeCalled = false;
@@ -344,10 +372,10 @@ public class MetricRegistryTest extends TestLogger {
 
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
                List<MetricReporter> reporters = registry.getReporters();
-               ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
-               ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
-               ((TestReporter8)reporters.get(2)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
-               ((TestReporter8)reporters.get(3)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+               ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
+               ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
+               ((TestReporter8) reporters.get(2)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+               ((TestReporter8) reporters.get(3)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
 
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
                group.counter("C");
@@ -383,6 +411,9 @@ public class MetricRegistryTest extends TestLogger {
                }
        }
 
+       /**
+        * Reporter that verifies that the configured delimiter is applied 
correctly when generating the metric identifier.
+        */
        public static class TestReporter8 extends TestReporter {
                char expectedDelimiter;
                public static int numCorrectDelimitersForRegister = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d6fc48c..5e6bbce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -15,12 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.metrics;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -34,18 +30,25 @@ import 
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import org.junit.Assert;
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.net.InetAddress;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for the behavior of the metric system on a task manager.
+ */
 public class TaskManagerMetricsTest extends TestLogger {
 
        /**
@@ -78,7 +81,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                        final Configuration config = new Configuration();
                        final ResourceID tmResourceID = ResourceID.generate();
 
-                       TaskManagerServicesConfiguration 
taskManagerServicesConfiguration = 
+                       TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
                                        
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), false);
 
                        TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(config);
@@ -87,7 +90,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                        taskManagerServicesConfiguration, 
tmResourceID);
 
                        final MetricRegistry tmRegistry = 
taskManagerServices.getMetricRegistry();
-                       
+
                        // create the task manager
                        final Props tmProps = TaskManager.getTaskManagerProps(
                                TaskManager.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
index 6e3d8f4..d36c469 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -24,6 +25,7 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,7 +45,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
+/**
+ * Tests for the {@link MetricDumpSerialization}.
+ */
 public class MetricDumpSerializerTest {
        @Test
        public void testNullGaugeHandling() throws IOException {
@@ -51,20 +55,20 @@ public class MetricDumpSerializerTest {
                MetricDumpSerialization.MetricDumpDeserializer deserializer = 
new MetricDumpSerialization.MetricDumpDeserializer();
 
                Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
-               
+
                gauges.put(new Gauge<Object>() {
                        @Override
                        public Object getValue() {
                                return null;
                        }
                }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g"));
-               
+
                MetricDumpSerialization.MetricSerializationResult output = 
serializer.serialize(
-                       Collections.<Counter, 
Tuple2<QueryScopeInfo,String>>emptyMap(),
+                       Collections.<Counter, Tuple2<QueryScopeInfo, 
String>>emptyMap(),
                        gauges,
                        Collections.<Histogram, Tuple2<QueryScopeInfo, 
String>>emptyMap(),
                        Collections.<Meter, Tuple2<QueryScopeInfo, 
String>>emptyMap());
-               
+
                // no metrics should be serialized
                Assert.assertEquals(0, output.serializedMetrics.length);
 
@@ -80,10 +84,10 @@ public class MetricDumpSerializerTest {
                final ObjectOutputStream oos = new ObjectOutputStream(bos);
 
                oos.writeObject(serializer.serialize(
-                       new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(),
-                       new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(),
-                       new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(),
-                       new HashMap<Meter, Tuple2<QueryScopeInfo,String>>()));
+                       new HashMap<Counter, Tuple2<QueryScopeInfo, String>>(),
+                       new HashMap<Gauge<?>, Tuple2<QueryScopeInfo, String>>(),
+                       new HashMap<Histogram, Tuple2<QueryScopeInfo, 
String>>(),
+                       new HashMap<Meter, Tuple2<QueryScopeInfo, String>>()));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
index 3b65184..c7b9793 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.junit.Test;
@@ -25,6 +26,9 @@ import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_H
 import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link MetricDump} classes.
+ */
 public class MetricDumpTest {
        @Test
        public void testDumpedCounter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 2243495..5c33ad6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -15,13 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.testkit.TestActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -34,11 +30,20 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link MetricQueryService}.
+ */
 public class MetricQueryServiceTest extends TestLogger {
        @Test
        public void testCreateDump() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
index 597e376..f4f9515 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.dump;
 
 import org.junit.Test;
@@ -26,6 +27,9 @@ import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY
 import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
 public class QueryScopeInfoTest {
        @Test
        public void testJobManagerMetricInfo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 04e40ae..648ee47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -28,11 +29,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link AbstractMetricGroup}.
+ */
 public class AbstractMetricGroupTest {
        /**
         * Verifies that no {@link NullPointerException} is thrown when {@link 
AbstractMetricGroup#getAllVariables()} is
@@ -54,7 +59,7 @@ public class AbstractMetricGroupTest {
                        }
                };
                assertTrue(group.getAllVariables().isEmpty());
-               
+
                registry.shutdown();
        }
 
@@ -119,11 +124,15 @@ public class AbstractMetricGroupTest {
 
        }
 
+       /**
+        * Reporter that verifies the scope caching behavior.
+        */
        public static class TestReporter1 extends ScopeCheckingTestReporter {
                @Override
                public String filterCharacters(String input) {
                        return FILTER_B.filterCharacters(input);
                }
+
                @Override
                public void checkScopes(Metric metric, String metricName, 
MetricGroup group) {
                        // the first call determines which filter is applied to 
all future calls; in this case no filter is used at all
@@ -141,6 +150,9 @@ public class AbstractMetricGroupTest {
                }
        }
 
+       /**
+        * Reporter that verifies the scope caching behavior.
+        */
        public static class TestReporter2 extends ScopeCheckingTestReporter {
                @Override
                public String filterCharacters(String input) {
@@ -173,7 +185,7 @@ public class AbstractMetricGroupTest {
                try {
                        TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(testRegistry, "host", "id");
                        assertEquals("MetricReporters list should be empty", 0, 
testRegistry.getReporters().size());
-                       
+
                        // default delimiter should be used
                        assertEquals("A.B.X.D.1", 
group.getMetricIdentifier("1", FILTER_C));
                        // no caching should occur

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 7834755..03341a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
@@ -26,12 +27,16 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link JobManagerMetricGroup}.
+ */
 public class JobManagerGroupTest extends TestLogger {
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d8bd57a..d734dfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -26,11 +26,15 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link JobManagerJobMetricGroup}.
+ */
 public class JobManagerJobGroupTest extends TestLogger {
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index ace0236..56ce5fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -33,8 +34,11 @@ import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the registration of groups and metrics on a {@link MetricGroup}.
+ */
 public class MetricGroupRegistrationTest {
        /**
         * Verifies that group methods instantiate the correct metric with the 
given name.
@@ -59,7 +63,7 @@ public class MetricGroupRegistrationTest {
                                return null;
                        }
                });
-               
+
                Assert.assertEquals(gauge, TestReporter1.lastPassedMetric);
                assertEquals("gauge", TestReporter1.lastPassedName);
 
@@ -85,8 +89,11 @@ public class MetricGroupRegistrationTest {
                registry.shutdown();
        }
 
+       /**
+        * Reporter that exposes the last name and metric instance it was 
notified of.
+        */
        public static class TestReporter1 extends TestReporter {
-               
+
                public static Metric lastPassedMetric;
                public static String lastPassedName;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 665abb1..7397c87 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -24,22 +24,29 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link MetricGroup}.
+ */
 public class MetricGroupTest extends TestLogger {
 
        private static final MetricRegistryConfiguration 
defaultMetricRegistryConfiguration = 
MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
-       
+
        private MetricRegistry registry;
 
        private final MetricRegistry exceptionOnRegister = new 
ExceptionOnRegisterRegistry();
@@ -63,7 +70,7 @@ public class MetricGroupTest extends TestLogger {
                String groupName = "sometestname";
                MetricGroup subgroup1 = group.addGroup(groupName);
                MetricGroup subgroup2 = group.addGroup(groupName);
-               
+
                assertNotNull(subgroup1);
                assertNotNull(subgroup2);
                assertTrue(subgroup1 == subgroup2);
@@ -82,10 +89,12 @@ public class MetricGroupTest extends TestLogger {
                group.counter("testcounter");
                group.gauge("testgauge", new Gauge<Object>() {
                        @Override
-                       public Object getValue() { return null; }
+                       public Object getValue() {
+                               return null;
+                       }
                });
        }
-       
+
        @Test
        public void closedGroupCreatesClosedGroups() {
                GenericMetricGroup group = new 
GenericMetricGroup(exceptionOnRegister,
@@ -94,7 +103,7 @@ public class MetricGroupTest extends TestLogger {
 
                group.close();
                assertTrue(group.isClosed());
-               
+
                AbstractMetricGroup subgroup = (AbstractMetricGroup) 
group.addGroup("test subgroup");
                assertTrue(subgroup.isClosed());
        }
@@ -104,7 +113,7 @@ public class MetricGroupTest extends TestLogger {
                final String name = "abctestname";
                GenericMetricGroup group = new GenericMetricGroup(
                                registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
-               
+
                assertNotNull(group.counter(name));
                assertNotNull(group.counter(name));
        }
@@ -114,7 +123,7 @@ public class MetricGroupTest extends TestLogger {
                final String name = "abctestname";
                GenericMetricGroup group = new GenericMetricGroup(
                                registry, new 
DummyAbstractMetricGroup(registry), "testgroup");
-               
+
                assertNotNull(group.addGroup(name));
                assertNotNull(group.counter(name));
        }
@@ -143,11 +152,11 @@ public class MetricGroupTest extends TestLogger {
                assertEquals(vid.toString(), info2.vertexID);
                assertEquals(4, info2.subtaskIndex);
        }
-       
+
        // 
------------------------------------------------------------------------
-       
+
        private static class ExceptionOnRegisterRegistry extends MetricRegistry 
{
-               
+
                public ExceptionOnRegisterRegistry() {
                        super(defaultMetricRegistryConfiguration);
                }
@@ -164,7 +173,7 @@ public class MetricGroupTest extends TestLogger {
        }
 
        // 
------------------------------------------------------------------------
-       
+
        private static class DummyAbstractMetricGroup extends 
AbstractMetricGroup {
 
                public DummyAbstractMetricGroup(MetricRegistry registry) {
@@ -182,7 +191,8 @@ public class MetricGroupTest extends TestLogger {
                }
 
                @Override
-               protected void addMetric(String name, Metric metric) {}
+               protected void addMetric(String name, Metric metric) {
+               }
 
                @Override
                public MetricGroup addGroup(String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index af73c27..c232cf2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Map;
@@ -34,6 +35,9 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Tests for the {@link OperatorMetricGroup}.
+ */
 public class OperatorGroupTest extends TestLogger {
 
        @Test
@@ -81,7 +85,7 @@ public class OperatorGroupTest extends TestLogger {
                JobID jid = new JobID();
                AbstractID tid = new AbstractID();
                AbstractID eid = new AbstractID();
-               
+
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
                TaskManagerJobMetricGroup jmGroup = new 
TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
                TaskMetricGroup taskGroup = new TaskMetricGroup(

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
index 1ff804a..f6a1277 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
@@ -15,20 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link QueryScopeInfo} classes.
+ */
 public class QueryScopeInfoTest {
        @Test
        public void testJobManagerQueryScopeInfo() {
                QueryScopeInfo.JobManagerQueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
                assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
                assertEquals("", info.scope);
-               
+
                info = info.copy("world");
                assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
                assertEquals("world", info.scope);
@@ -53,7 +58,7 @@ public class QueryScopeInfoTest {
                assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
                assertEquals("world", info.scope);
                assertEquals("tmid", info.taskManagerID);
-               
+
                info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", 
"hello");
                assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
                assertEquals("hello", info.scope);

http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index 564a518..bcf77de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -15,17 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Tests for the {@link TaskIOMetricGroup}.
+ */
 public class TaskIOMetricGroupTest {
        @Test
        public void testTaskIOMetricGroup() {
@@ -50,7 +55,7 @@ public class TaskIOMetricGroupTest {
                taskIO.getNumBytesInLocalCounter().inc(100L);
                taskIO.getNumBytesInRemoteCounter().inc(150L);
                taskIO.getNumBytesOutCounter().inc(250L);
-               
+
                IOMetrics io = taskIO.createSnapshot();
                assertEquals(32L, io.getNumRecordsIn());
                assertEquals(64L, io.getNumRecordsOut());

Reply via email to