[FLINK-6793] Activate checkstyle for runtime/metrics This closes #4037.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b3d284b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b3d284b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b3d284b Branch: refs/heads/master Commit: 2b3d284bf09a30edafa4dadf50492156bb47027b Parents: cd5b4a6 Author: zentol <[email protected]> Authored: Wed May 31 16:39:52 2017 +0200 Committer: zentol <[email protected]> Committed: Fri Jun 2 15:13:55 2017 +0200 ---------------------------------------------------------------------- flink-runtime/pom.xml | 1 - .../flink/runtime/metrics/MetricNames.java | 4 ++ .../flink/runtime/metrics/MetricRegistry.java | 22 ++++--- .../metrics/MetricRegistryConfiguration.java | 11 ++-- .../flink/runtime/metrics/ViewUpdater.java | 1 + .../flink/runtime/metrics/dump/MetricDump.java | 4 +- .../metrics/dump/MetricDumpSerialization.java | 31 ++++++---- .../metrics/dump/MetricQueryService.java | 16 ++--- .../runtime/metrics/dump/QueryScopeInfo.java | 5 +- .../metrics/groups/AbstractMetricGroup.java | 52 ++++++++-------- .../metrics/groups/ComponentMetricGroup.java | 8 +-- .../metrics/groups/GenericMetricGroup.java | 2 +- .../groups/JobManagerJobMetricGroup.java | 2 + .../metrics/groups/JobManagerMetricGroup.java | 1 + .../runtime/metrics/groups/JobMetricGroup.java | 9 +-- .../metrics/groups/OperatorIOMetricGroup.java | 3 +- .../metrics/groups/OperatorMetricGroup.java | 4 +- .../metrics/groups/TaskIOMetricGroup.java | 4 +- .../groups/TaskManagerJobMetricGroup.java | 4 +- .../metrics/groups/TaskManagerMetricGroup.java | 1 - .../runtime/metrics/groups/TaskMetricGroup.java | 9 +-- .../runtime/metrics/scope/ScopeFormat.java | 18 +++--- .../runtime/metrics/scope/ScopeFormats.java | 10 ++- .../flink/runtime/metrics/util/MetricUtils.java | 7 ++- .../runtime/metrics/MetricRegistryTest.java | 65 +++++++++++++++----- .../runtime/metrics/TaskManagerMetricsTest.java | 23 ++++--- .../metrics/dump/MetricDumpSerializerTest.java | 22 ++++--- .../runtime/metrics/dump/MetricDumpTest.java | 4 ++ .../metrics/dump/MetricQueryServiceTest.java | 15 +++-- .../metrics/dump/QueryScopeInfoTest.java | 4 ++ .../metrics/groups/AbstractMetricGroupTest.java | 16 ++++- .../metrics/groups/JobManagerGroupTest.java | 5 ++ .../metrics/groups/JobManagerJobGroupTest.java | 4 ++ .../groups/MetricGroupRegistrationTest.java | 13 +++- .../runtime/metrics/groups/MetricGroupTest.java | 38 +++++++----- .../metrics/groups/OperatorGroupTest.java | 6 +- .../metrics/groups/QueryScopeInfoTest.java | 9 ++- .../metrics/groups/TaskIOMetricGroupTest.java | 7 ++- .../metrics/groups/TaskManagerGroupTest.java | 24 ++++---- .../metrics/groups/TaskManagerJobGroupTest.java | 5 +- .../metrics/groups/TaskMetricGroupTest.java | 10 ++- .../metrics/util/DummyCharacterFilter.java | 4 ++ .../runtime/metrics/util/TestReporter.java | 3 + .../runtime/metrics/util/TestingHistogram.java | 4 ++ 44 files changed, 331 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 2272bc7..d83d671 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -451,7 +451,6 @@ under the License. **/runtime/leaderretrieval/**, **/runtime/memory/**, **/runtime/messages/**, - **/runtime/metrics/**, **/runtime/minicluster/**, **/runtime/net/**, **/runtime/operators/**, http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 9202ca1..300b4b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -15,8 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics; +/** + * Collection of metric names. + */ public class MetricNames { private MetricNames() { } http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 9f46d47..5018cbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.metrics; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.pattern.Patterns; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -38,11 +34,13 @@ import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; @@ -53,6 +51,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + /** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. @@ -154,7 +156,7 @@ public class MetricRegistry { /** * Initializes the MetricQueryService. - * + * * @param actorSystem ActorSystem to create the MetricQueryService on * @param resourceID resource ID used to disambiguate the actor name */ @@ -250,7 +252,7 @@ public class MetricRegistry { } } } - + private void shutdownExecutor() { if (executor != null) { executor.shutdown(); @@ -361,7 +363,7 @@ public class MetricRegistry { * This task is explicitly a static class, so that it does not hold any references to the enclosing * MetricsRegistry instance. * - * This is a subtle difference, but very important: With this static class, the enclosing class instance + * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance * may become garbage-collectible, whereas with an anonymous inner class, the timer thread * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer. * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible, http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java index 3475f04..6f6db86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class MetricRegistryConfiguration { private static final Logger LOG = LoggerFactory.getLogger(MetricRegistryConfiguration.class); - private static volatile MetricRegistryConfiguration DEFAULT_CONFIGURATION; + private static volatile MetricRegistryConfiguration defaultConfiguration; // regex pattern to split the defined reporters private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*"); @@ -148,15 +149,15 @@ public class MetricRegistryConfiguration { public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() { // create the default metric registry configuration only once - if (DEFAULT_CONFIGURATION == null) { + if (defaultConfiguration == null) { synchronized (MetricRegistryConfiguration.class) { - if (DEFAULT_CONFIGURATION == null) { - DEFAULT_CONFIGURATION = fromConfiguration(new Configuration()); + if (defaultConfiguration == null) { + defaultConfiguration = fromConfiguration(new Configuration()); } } } - return DEFAULT_CONFIGURATION; + return defaultConfiguration; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java index e4d0596..77bd0d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ViewUpdater.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics; import org.apache.flink.metrics.View; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java index 2239b50..202e453 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; import org.apache.flink.util.Preconditions; @@ -121,7 +122,7 @@ public abstract class MetricDump { } /** - * Container for the rate of a {@link org.apache.flink.metrics.Meter}. + * Container for the rate of a {@link org.apache.flink.metrics.Meter}. */ public static class MeterDump extends MetricDump { public final double rate; @@ -130,6 +131,7 @@ public abstract class MetricDump { super(scopeInfo, name); this.rate = rate; } + @Override public byte getCategory() { return METRIC_CATEGORY_METER; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java index e57a0d8..e173522 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; import org.apache.flink.api.java.tuple.Tuple2; @@ -27,6 +28,7 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +58,14 @@ public class MetricDumpSerialization { /** * This class encapsulates all serialized metrics and a count for each metric type. - * - * The counts are stored separately from the metrics since the final count for any given type can only be + * + * <p>The counts are stored separately from the metrics since the final count for any given type can only be * determined after all metrics of that type were serialized. Storing them together in a single byte[] would * require an additional copy of all serialized metrics, as you would first have to serialize the metrics into a * temporary buffer to calculate the counts, write the counts to the final output and copy all metrics from the * temporary buffer. - * - * Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag) + * + * <p>Note that while one could implement the serialization in such a way so that at least 1 byte (a validity flag) * is written for each metric, this would require more bandwidth due to the sheer number of metrics. */ public static class MetricSerializationResult implements Serializable { @@ -75,12 +77,12 @@ public class MetricDumpSerialization { public final int numGauges; public final int numMeters; public final int numHistograms; - + public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) { Preconditions.checkNotNull(serializedMetrics); Preconditions.checkArgument(numCounters >= 0); Preconditions.checkArgument(numGauges >= 0); - Preconditions.checkArgument(numMeters >= 0); + Preconditions.checkArgument(numMeters >= 0); Preconditions.checkArgument(numHistograms >= 0); this.serializedMetrics = serializedMetrics; this.numCounters = numCounters; @@ -94,18 +96,21 @@ public class MetricDumpSerialization { // Serialization //------------------------------------------------------------------------- + /** + * Serializes a set of metrics into a {@link MetricSerializationResult}. + */ public static class MetricDumpSerializer { private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** * Serializes the given metrics and returns the resulting byte array. - * - * Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned + * + * <p>Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned * {@link MetricSerializationResult}. - * - * If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult} - * is partially corrupted. Such a result can be deserialized safely by + * + * <p>If the serialization of any primitive or String fails then the returned {@link MetricSerializationResult} + * is partially corrupted. Such a result can be deserialized safely by * {@link MetricDumpDeserializer#deserialize(MetricSerializationResult)}; however only metrics that were * fully serialized before the failure will be returned. * @@ -263,6 +268,9 @@ public class MetricDumpSerialization { // Deserialization //------------------------------------------------------------------------- + /** + * Deserializer for reading a list of {@link MetricDump MetricDumps} from a {@link MetricSerializationResult}. + */ public static class MetricDumpDeserializer { /** * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}. @@ -311,7 +319,6 @@ public class MetricDumpSerialization { } } - private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); String name = dis.readUTF(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java index 2229139..8821e0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -15,13 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Status; -import akka.actor.UntypedActor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Counter; @@ -31,6 +27,12 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Status; +import akka.actor.UntypedActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +46,7 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr /** * The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried. * - * It is realized as an actor and can be notified of + * <p>It is realized as an actor and can be notified of * - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)} * - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)} * - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()} @@ -217,6 +219,6 @@ public class MetricQueryService extends UntypedActor { } private static class CreateDump implements Serializable { - private static CreateDump INSTANCE = new CreateDump(); + private static final CreateDump INSTANCE = new CreateDump(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java index 6572ca0..9af9d78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; /** @@ -28,7 +29,7 @@ public abstract class QueryScopeInfo { public static final byte INFO_CATEGORY_TASK = 3; public static final byte INFO_CATEGORY_OPERATOR = 4; - /** The remaining scope not covered by specific fields */ + /** The remaining scope not covered by specific fields. */ public final String scope; private QueryScopeInfo(String scope) { @@ -45,7 +46,7 @@ public abstract class QueryScopeInfo { /** * Returns the category for this QueryScopeInfo. - * + * * @return category */ public abstract byte getCategory(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index a19970d..c67c5ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -30,6 +30,7 @@ import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +42,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups. - * + * * <p><b>IMPORTANT IMPLEMENTATION NOTE</b> - * + * * <p>This class uses locks for adding and removing metrics objects. This is done to * prevent resource leaks in the presence of concurrently closing a group and adding * metrics and subgroups. @@ -56,30 +57,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * from any metrics reporter and any internal maps. Note that even closed metrics groups * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code. * These metrics simply do not get reported any more, when created on a closed group. - * + * * @param <A> The type of the parent MetricGroup */ @Internal public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup { - /** shared logger */ private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); // ------------------------------------------------------------------------ - /** The parent group containing this group */ + /** The parent group containing this group. */ protected final A parent; /** The map containing all variables and their associated values, lazily computed. */ protected volatile Map<String, String> variables; - - /** The registry that this metrics group belongs to */ + + /** The registry that this metrics group belongs to. */ protected final MetricRegistry registry; - /** All metrics that are directly contained in this group */ + /** All metrics that are directly contained in this group. */ private final Map<String, Metric> metrics = new HashMap<>(); - /** All metric subgroups of this group */ + /** All metric subgroups of this group. */ private final Map<String, AbstractMetricGroup> groups = new HashMap<>(); /** The metrics scope represented by this group. @@ -97,7 +97,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** The metrics query service scope represented by this group, lazily computed. */ protected QueryScopeInfo queryServiceScopeInfo; - /** Flag indicating whether this group has been closed */ + /** Flag indicating whether this group has been closed. */ private volatile boolean closed; // ------------------------------------------------------------------------ @@ -111,7 +111,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl public Map<String, String> getAllVariables() { if (variables == null) { // avoid synchronization for common case - synchronized(this) { + synchronized (this) { if (variables == null) { if (parent != null) { variables = parent.getAllVariables(); @@ -126,7 +126,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the logical scope of this group, for example - * {@code "taskmanager.job.task"} + * {@code "taskmanager.job.task"}. * * @param filter character filter which is applied to the scope components * @return logical scope @@ -137,7 +137,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the logical scope of this group, for example - * {@code "taskmanager.job.task"} + * {@code "taskmanager.job.task"}. * * @param filter character filter which is applied to the scope components * @return logical scope @@ -155,7 +155,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the name for this group, meaning what kind of entity it represents, for example "taskmanager". - * + * * @param filter character filter which is applied to the name * @return logical name for this group */ @@ -163,9 +163,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Gets the scope as an array of the scope components, for example - * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} - * - * @see #getMetricIdentifier(String) + * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}. + * + * @see #getMetricIdentifier(String) */ public String[] getScopeComponents() { return scopeComponents; @@ -173,7 +173,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the metric query service scope for this group. - * + * * @param filter character filter * @return query service scope */ @@ -194,8 +194,8 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the fully qualified metric name, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} - * + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}. + * * @param metricName metric name * @return fully qualified metric name */ @@ -205,7 +205,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the fully qualified metric name, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}. * * @param metricName metric name * @param filter character filter which is applied to the scope components if not null. @@ -217,7 +217,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Returns the fully qualified metric name using the configured delimiter for the reporter with the given index, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}. * * @param metricName metric name * @param filter character filter which is applied to the scope components if not null. @@ -250,7 +250,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl return scopeStrings[reporterIndex] + delimiter + metricName; } } - + // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ @@ -292,7 +292,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl public Counter counter(String name) { return counter(name, new SimpleCounter()); } - + @Override public <C extends Counter> C counter(int name, C counter) { return counter(String.valueOf(name), counter); @@ -340,7 +340,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl /** * Adds the given metric to the group and registers it at the registry, if the group * is not yet closed, and if no metric with the same name has been registered before. - * + * * @param name the name to register the metric under * @param metric the metric to register */ @@ -372,7 +372,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl else { // we had a collision. put back the original value metrics.put(name, prior); - + // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Group already contains a Metric with the name '" + http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java index 9f0f483..0cd9942 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java @@ -25,9 +25,9 @@ import java.util.HashMap; import java.util.Map; /** - * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., + * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., * TaskManager, Job, Task, Operator). - * + * * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope. @@ -36,7 +36,7 @@ import java.util.Map; * certain identifiers from the scope. The scope for metrics belonging to the "Task" * group could for example include the task attempt number (more fine grained identification), or * exclude it (for continuity of the namespace across failure and recovery). - * + * * @param <P> The type of the parent MetricGroup. */ @Internal @@ -101,7 +101,7 @@ public abstract class ComponentMetricGroup<P extends AbstractMetricGroup<?>> ext /** * Gets all component metric groups that are contained in this component metric group. - * + * * @return All component metric groups that are contained in this component metric group. */ protected abstract Iterable<? extends ComponentMetricGroup> subComponents(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java index 5978f2d..ee7d1ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; */ @Internal public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> { - /** The name of this group */ + /** The name of this group. */ private String name; public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java index c4902e8..b62c7b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.annotation.Internal; @@ -22,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.metrics.MetricRegistry; import javax.annotation.Nullable; + import java.util.Collections; import static org.apache.flink.util.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index 5a35110..e09051d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java index 17f6189..876e794 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java @@ -26,21 +26,22 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import javax.annotation.Nullable; + import java.util.Map; /** * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to * a specific job. - * + * * @param <C> The type of the parent ComponentMetricGroup. */ @Internal public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends ComponentMetricGroup<C> { - /** The ID of the job represented by this metrics group */ + /** The ID of the job represented by this metrics group. */ protected final JobID jobId; - /** The name of the job represented by this metrics group */ + /** The name of the job represented by this metrics group. */ @Nullable protected final String jobName; @@ -53,7 +54,7 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends @Nullable String jobName, String[] scope) { super(registry, scope, parent); - + this.jobId = jobId; this.jobName = jobName; } http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java index 5bf7d1f..6bf3c6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; @@ -64,7 +65,7 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> public void reuseInputMetricsForTask() { TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup(); taskIO.reuseRecordsInputCounter(this.numRecordsIn); - + } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index 37c9dd8..2313873 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -46,7 +46,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> { } // ------------------------------------------------------------------------ - + public final TaskMetricGroup parent() { return parent; } @@ -68,7 +68,7 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> { public OperatorIOMetricGroup getIOMetricGroup() { return ioMetrics; } - + // ------------------------------------------------------------------------ // Component Metric Group Specifics // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 38accad..e12ecd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -24,11 +24,11 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.executiongraph.IOMetrics; import java.util.ArrayList; import java.util.List; @@ -110,7 +110,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { // ============================================================================================ /** - * Initialize Buffer Metrics for a task + * Initialize Buffer Metrics for a task. */ public void initializeBufferMetrics(Task task) { final MetricGroup buffers = addGroup("buffers"); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java index 79a87d0..3921553 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.annotation.Internal; @@ -25,6 +26,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; @@ -39,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricGroup> { - /** Map from execution attempt ID (task identifier) to task metrics */ + /** Map from execution attempt ID (task identifier) to task metrics. */ private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java index 92c509a..d85e868 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java @@ -46,7 +46,6 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr private final String taskManagerId; - public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) { super(registry, registry.getScopeFormats().getTaskManagerFormat().formatScope(hostname, taskManagerId), null); this.hostname = hostname; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 43e8e1b..cb7aaa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; @@ -33,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task. - * + * * <p>Contains extra logic for adding operators. */ @Internal @@ -42,13 +43,13 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); private final TaskIOMetricGroup ioMetrics; - - /** The execution Id uniquely identifying the executed task represented by this metrics group */ + + /** The execution Id uniquely identifying the executed task represented by this metrics group. */ private final AbstractID executionId; @Nullable protected final AbstractID vertexId; - + @Nullable private final String taskName; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java index f9efb88..18b3a0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java @@ -52,7 +52,7 @@ public abstract class ScopeFormat { /** * If the scope format starts with this character, then the parent components scope * format will be used as a prefix. - * + * * <p>For example, if the TaskManager's job format is {@code "*.<job_name>"}, and the * TaskManager format is {@code "<host>"}, then the job's metrics * will have {@code "<host>.<job_name>"} as their scope. @@ -90,16 +90,16 @@ public abstract class ScopeFormat { // ----- Operator ---- public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name"); - + // ------------------------------------------------------------------------ // Scope Format Base // ------------------------------------------------------------------------ - /** The scope format */ + /** The scope format. */ private final String format; - /** The format, split into components */ + /** The format, split into components. */ private final String[] template; private final int[] templatePos; @@ -125,7 +125,7 @@ public abstract class ScopeFormat { String[] parentTemplate = parent.template; int parentLen = parentTemplate.length; - + this.template = new String[parentLen + rawComponents.length - 1]; System.arraycopy(parentTemplate, 0, this.template, 0, parentLen); System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1); @@ -137,14 +137,14 @@ public abstract class ScopeFormat { // --- compute the replacement matrix --- // a bit of clumsy Java collections code ;-) - + HashMap<String, Integer> varToValuePos = arrayToMap(variables); List<Integer> templatePos = new ArrayList<>(); List<Integer> valuePos = new ArrayList<>(); for (int i = 0; i < template.length; i++) { final String component = template[i]; - + // check if that is a variable if (component != null && component.length() >= 3 && component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') { @@ -188,7 +188,7 @@ public abstract class ScopeFormat { public String toString() { return "ScopeFormat '" + format + '\''; } - + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -233,7 +233,7 @@ public abstract class ScopeFormat { } return sb.toString(); } - + protected static String valueOrNull(Object value) { return (value == null || (value instanceof String && ((String) value).isEmpty())) ? "null" : value.toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java index bde93be..dc49d32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java @@ -67,8 +67,7 @@ public class ScopeFormats { String taskManagerFormat, String taskManagerJobFormat, String taskFormat, - String operatorFormat) - { + String operatorFormat) { this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat); this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat); this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat); @@ -86,8 +85,7 @@ public class ScopeFormats { TaskManagerScopeFormat taskManagerFormat, TaskManagerJobScopeFormat taskManagerJobFormat, TaskScopeFormat taskFormat, - OperatorScopeFormat operatorFormat) - { + OperatorScopeFormat operatorFormat) { this.jobManagerFormat = checkNotNull(jobManagerFormat); this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat); this.taskManagerFormat = checkNotNull(taskManagerFormat); @@ -129,8 +127,8 @@ public class ScopeFormats { // ------------------------------------------------------------------------ /** - * Creates the scope formats as defined in the given configuration - * + * Creates the scope formats as defined in the given configuration. + * * @param config The configuration that defines the formats * @return The ScopeFormats parsed from the configuration */ http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index 4612eaf..2ecde42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.util; -import org.apache.commons.lang3.text.WordUtils; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.NetworkEnvironment; + +import org.apache.commons.lang3.text.WordUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; +/** + * Utility class to register pre-defined metric sets. + */ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); private static final String METRIC_GROUP_STATUS_NAME = "Status"; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index b9502b2..1de2551 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.metrics; -import akka.actor.ActorNotFound; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; @@ -34,20 +31,27 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.util.TestReporter; - import org.apache.flink.util.TestLogger; + +import akka.actor.ActorNotFound; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; import org.junit.Assert; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.duration.FiniteDuration; import java.util.List; import java.util.concurrent.TimeUnit; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for the {@link MetricRegistry}. + */ public class MetricRegistryTest extends TestLogger { private static final char GLOBAL_DEFAULT_DELIMITER = '.'; @@ -55,14 +59,14 @@ public class MetricRegistryTest extends TestLogger { @Test public void testIsShutdown() { MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - + Assert.assertFalse(metricRegistry.isShutdown()); - + metricRegistry.shutdown(); - + Assert.assertTrue(metricRegistry.isShutdown()); } - + /** * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. */ @@ -82,6 +86,9 @@ public class MetricRegistryTest extends TestLogger { metricRegistry.shutdown(); } + /** + * Reporter that exposes whether open() was called. + */ protected static class TestReporter1 extends TestReporter { public static boolean wasOpened = false; @@ -114,6 +121,9 @@ public class MetricRegistryTest extends TestLogger { metricRegistry.shutdown(); } + /** + * Reporter that exposes whether open() was called. + */ protected static class TestReporter11 extends TestReporter { public static boolean wasOpened = false; @@ -123,6 +133,9 @@ public class MetricRegistryTest extends TestLogger { } } + /** + * Reporter that exposes whether open() was called. + */ protected static class TestReporter12 extends TestReporter { public static boolean wasOpened = false; @@ -132,6 +145,9 @@ public class MetricRegistryTest extends TestLogger { } } + /** + * Reporter that exposes whether open() was called. + */ protected static class TestReporter13 extends TestReporter { public static boolean wasOpened = false; @@ -156,6 +172,9 @@ public class MetricRegistryTest extends TestLogger { new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown(); } + /** + * Reporter that verifies whether configured arguments were properly passed. + */ protected static class TestReporter2 extends TestReporter { @Override public void open(MetricConfig config) { @@ -186,9 +205,9 @@ public class MetricRegistryTest extends TestLogger { int reportCount = TestReporter3.reportCount; long curT = System.currentTimeMillis(); /** - * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. - * This value however does not not take the first triggered report into account (=> +1). - * Furthermore we have to account for the mis-alignment between reports being triggered and our time + * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. + * This value however does not not take the first triggered report into account (=> +1). + * Furthermore we have to account for the mis-alignment between reports being triggered and our time * measurement (=> +1); for T=200 a total of 4-6 reports may have been * triggered depending on whether the end of the interval for the first reports ends before * or after T=50. @@ -201,6 +220,9 @@ public class MetricRegistryTest extends TestLogger { registry.shutdown(); } + /** + * Reporter that exposes how often report() was called. + */ protected static class TestReporter3 extends TestReporter implements Scheduled { public static int reportCount = 0; @@ -234,6 +256,9 @@ public class MetricRegistryTest extends TestLogger { registry.shutdown(); } + /** + * Reporter that exposes whether it was notified of added or removed metrics. + */ protected static class TestReporter6 extends TestReporter { public static boolean addCalled = false; public static boolean removeCalled = false; @@ -253,6 +278,9 @@ public class MetricRegistryTest extends TestLogger { } } + /** + * Reporter that exposes whether it was notified of added or removed metrics. + */ protected static class TestReporter7 extends TestReporter { public static boolean addCalled = false; public static boolean removeCalled = false; @@ -344,10 +372,10 @@ public class MetricRegistryTest extends TestLogger { MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); List<MetricReporter> reporters = registry.getReporters(); - ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; //test1 reporter - ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; //test2 reporter - ((TestReporter8)reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter - ((TestReporter8)reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter + ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter + ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter + ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter + ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); group.counter("C"); @@ -383,6 +411,9 @@ public class MetricRegistryTest extends TestLogger { } } + /** + * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier. + */ public static class TestReporter8 extends TestReporter { char expectedDelimiter; public static int numCorrectDelimitersForRegister = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index d6fc48c..5e6bbce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -15,12 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.runtime.metrics; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; +package org.apache.flink.runtime.metrics; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -34,18 +30,25 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; import org.apache.flink.runtime.taskmanager.TaskManager; - import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.JavaTestKit; import org.junit.Assert; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - import java.net.InetAddress; import java.util.UUID; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; + +/** + * Tests for the behavior of the metric system on a task manager. + */ public class TaskManagerMetricsTest extends TestLogger { /** @@ -78,7 +81,7 @@ public class TaskManagerMetricsTest extends TestLogger { final Configuration config = new Configuration(); final ResourceID tmResourceID = ResourceID.generate(); - TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), false); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config); @@ -87,7 +90,7 @@ public class TaskManagerMetricsTest extends TestLogger { taskManagerServicesConfiguration, tmResourceID); final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry(); - + // create the task manager final Props tmProps = TaskManager.getTaskManagerProps( TaskManager.class, http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java index 6e3d8f4..d36c469 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; import org.apache.flink.api.java.tuple.Tuple2; @@ -24,6 +25,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.metrics.util.TestingHistogram; + import org.junit.Assert; import org.junit.Test; @@ -43,7 +45,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - +/** + * Tests for the {@link MetricDumpSerialization}. + */ public class MetricDumpSerializerTest { @Test public void testNullGaugeHandling() throws IOException { @@ -51,20 +55,20 @@ public class MetricDumpSerializerTest { MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer(); Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); - + gauges.put(new Gauge<Object>() { @Override public Object getValue() { return null; } }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "g")); - + MetricDumpSerialization.MetricSerializationResult output = serializer.serialize( - Collections.<Counter, Tuple2<QueryScopeInfo,String>>emptyMap(), + Collections.<Counter, Tuple2<QueryScopeInfo, String>>emptyMap(), gauges, Collections.<Histogram, Tuple2<QueryScopeInfo, String>>emptyMap(), Collections.<Meter, Tuple2<QueryScopeInfo, String>>emptyMap()); - + // no metrics should be serialized Assert.assertEquals(0, output.serializedMetrics.length); @@ -80,10 +84,10 @@ public class MetricDumpSerializerTest { final ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(serializer.serialize( - new HashMap<Counter, Tuple2<QueryScopeInfo,String>>(), - new HashMap<Gauge<?>, Tuple2<QueryScopeInfo,String>>(), - new HashMap<Histogram, Tuple2<QueryScopeInfo,String>>(), - new HashMap<Meter, Tuple2<QueryScopeInfo,String>>())); + new HashMap<Counter, Tuple2<QueryScopeInfo, String>>(), + new HashMap<Gauge<?>, Tuple2<QueryScopeInfo, String>>(), + new HashMap<Histogram, Tuple2<QueryScopeInfo, String>>(), + new HashMap<Meter, Tuple2<QueryScopeInfo, String>>())); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java index 3b65184..c7b9793 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; import org.junit.Test; @@ -25,6 +26,9 @@ import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_H import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; import static org.junit.Assert.assertEquals; +/** + * Tests for the {@link MetricDump} classes. + */ public class MetricDumpTest { @Test public void testDumpedCounter() { http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 2243495..5c33ad6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -15,13 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.testkit.TestActorRef; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -34,11 +30,20 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestingHistogram; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.testkit.TestActorRef; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for the {@link MetricQueryService}. + */ public class MetricQueryServiceTest extends TestLogger { @Test public void testCreateDump() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java index 597e376..f4f9515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.dump; import org.junit.Test; @@ -26,6 +27,9 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; import static org.junit.Assert.assertEquals; +/** + * Tests for the {@link QueryScopeInfo} classes. + */ public class QueryScopeInfoTest { @Test public void testJobManagerMetricInfo() { http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 04e40ae..648ee47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.ConfigConstants; @@ -28,11 +29,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.TestReporter; + import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for the {@link AbstractMetricGroup}. + */ public class AbstractMetricGroupTest { /** * Verifies that no {@link NullPointerException} is thrown when {@link AbstractMetricGroup#getAllVariables()} is @@ -54,7 +59,7 @@ public class AbstractMetricGroupTest { } }; assertTrue(group.getAllVariables().isEmpty()); - + registry.shutdown(); } @@ -119,11 +124,15 @@ public class AbstractMetricGroupTest { } + /** + * Reporter that verifies the scope caching behavior. + */ public static class TestReporter1 extends ScopeCheckingTestReporter { @Override public String filterCharacters(String input) { return FILTER_B.filterCharacters(input); } + @Override public void checkScopes(Metric metric, String metricName, MetricGroup group) { // the first call determines which filter is applied to all future calls; in this case no filter is used at all @@ -141,6 +150,9 @@ public class AbstractMetricGroupTest { } } + /** + * Reporter that verifies the scope caching behavior. + */ public static class TestReporter2 extends ScopeCheckingTestReporter { @Override public String filterCharacters(String input) { @@ -173,7 +185,7 @@ public class AbstractMetricGroupTest { try { TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id"); assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size()); - + // default delimiter should be used assertEquals("A.B.X.D.1", group.getMetricIdentifier("1", FILTER_C)); // no caching should occur http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 7834755..03341a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; @@ -26,12 +27,16 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +/** + * Tests for the {@link JobManagerMetricGroup}. + */ public class JobManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index d8bd57a..d734dfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -26,11 +26,15 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +/** + * Tests for the {@link JobManagerJobMetricGroup}. + */ public class JobManagerJobGroupTest extends TestLogger { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index ace0236..56ce5fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.ConfigConstants; @@ -33,8 +34,11 @@ import org.apache.flink.runtime.metrics.util.TestReporter; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +/** + * Tests for the registration of groups and metrics on a {@link MetricGroup}. + */ public class MetricGroupRegistrationTest { /** * Verifies that group methods instantiate the correct metric with the given name. @@ -59,7 +63,7 @@ public class MetricGroupRegistrationTest { return null; } }); - + Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); assertEquals("gauge", TestReporter1.lastPassedName); @@ -85,8 +89,11 @@ public class MetricGroupRegistrationTest { registry.shutdown(); } + /** + * Reporter that exposes the last name and metric instance it was notified of. + */ public static class TestReporter1 extends TestReporter { - + public static Metric lastPassedMetric; public static String lastPassedName; http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 665abb1..7397c87 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -24,22 +24,29 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricRegistry; - import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Tests for the {@link MetricGroup}. + */ public class MetricGroupTest extends TestLogger { private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration(); - + private MetricRegistry registry; private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry(); @@ -63,7 +70,7 @@ public class MetricGroupTest extends TestLogger { String groupName = "sometestname"; MetricGroup subgroup1 = group.addGroup(groupName); MetricGroup subgroup2 = group.addGroup(groupName); - + assertNotNull(subgroup1); assertNotNull(subgroup2); assertTrue(subgroup1 == subgroup2); @@ -82,10 +89,12 @@ public class MetricGroupTest extends TestLogger { group.counter("testcounter"); group.gauge("testgauge", new Gauge<Object>() { @Override - public Object getValue() { return null; } + public Object getValue() { + return null; + } }); } - + @Test public void closedGroupCreatesClosedGroups() { GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister, @@ -94,7 +103,7 @@ public class MetricGroupTest extends TestLogger { group.close(); assertTrue(group.isClosed()); - + AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup"); assertTrue(subgroup.isClosed()); } @@ -104,7 +113,7 @@ public class MetricGroupTest extends TestLogger { final String name = "abctestname"; GenericMetricGroup group = new GenericMetricGroup( registry, new DummyAbstractMetricGroup(registry), "testgroup"); - + assertNotNull(group.counter(name)); assertNotNull(group.counter(name)); } @@ -114,7 +123,7 @@ public class MetricGroupTest extends TestLogger { final String name = "abctestname"; GenericMetricGroup group = new GenericMetricGroup( registry, new DummyAbstractMetricGroup(registry), "testgroup"); - + assertNotNull(group.addGroup(name)); assertNotNull(group.counter(name)); } @@ -143,11 +152,11 @@ public class MetricGroupTest extends TestLogger { assertEquals(vid.toString(), info2.vertexID); assertEquals(4, info2.subtaskIndex); } - + // ------------------------------------------------------------------------ - + private static class ExceptionOnRegisterRegistry extends MetricRegistry { - + public ExceptionOnRegisterRegistry() { super(defaultMetricRegistryConfiguration); } @@ -164,7 +173,7 @@ public class MetricGroupTest extends TestLogger { } // ------------------------------------------------------------------------ - + private static class DummyAbstractMetricGroup extends AbstractMetricGroup { public DummyAbstractMetricGroup(MetricRegistry registry) { @@ -182,7 +191,8 @@ public class MetricGroupTest extends TestLogger { } @Override - protected void addMetric(String name, Metric metric) {} + protected void addMetric(String name, Metric metric) { + } @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index af73c27..c232cf2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.Map; @@ -34,6 +35,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +/** + * Tests for the {@link OperatorMetricGroup}. + */ public class OperatorGroupTest extends TestLogger { @Test @@ -81,7 +85,7 @@ public class OperatorGroupTest extends TestLogger { JobID jid = new JobID(); AbstractID tid = new AbstractID(); AbstractID eid = new AbstractID(); - + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java index 1ff804a..f6a1277 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java @@ -15,20 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; + import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Tests for the {@link QueryScopeInfo} classes. + */ public class QueryScopeInfoTest { @Test public void testJobManagerQueryScopeInfo() { QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); assertEquals("", info.scope); - + info = info.copy("world"); assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); assertEquals("world", info.scope); @@ -53,7 +58,7 @@ public class QueryScopeInfoTest { assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); assertEquals("world", info.scope); assertEquals("tmid", info.taskManagerID); - + info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello"); assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); assertEquals("hello", info.scope); http://git-wip-us.apache.org/repos/asf/flink/blob/2b3d284b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index 564a518..bcf77de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -15,17 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; + import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +/** + * Tests for the {@link TaskIOMetricGroup}. + */ public class TaskIOMetricGroupTest { @Test public void testTaskIOMetricGroup() { @@ -50,7 +55,7 @@ public class TaskIOMetricGroupTest { taskIO.getNumBytesInLocalCounter().inc(100L); taskIO.getNumBytesInRemoteCounter().inc(150L); taskIO.getNumBytesOutCounter().inc(250L); - + IOMetrics io = taskIO.createSnapshot(); assertEquals(32L, io.getNumRecordsIn()); assertEquals(64L, io.getNumRecordsOut());
