[FLINK-4192] [metrics] Move metrics classes out of 'flink-core' - moved user-facing API to 'flink-metrics/flink-metrics-core' - moved JMXReporter to 'flink-metrics/flink-metrics-jmx' - moved remaining metric classes to 'flink-runtime'
This closes #2226 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fec1f9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fec1f9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fec1f9 Branch: refs/heads/master Commit: e3fec1f9ad737c01fc5c22bd0c327bfce91938a9 Parents: e4fe89d Author: zentol <[email protected]> Authored: Fri Jul 22 12:33:16 2016 +0200 Committer: zentol <[email protected]> Committed: Tue Jul 26 12:30:15 2016 +0200 ---------------------------------------------------------------------- docs/apis/metrics.md | 4 +- flink-core/pom.xml | 6 + .../apache/flink/metrics/CharacterFilter.java | 36 -- .../java/org/apache/flink/metrics/Counter.java | 59 --- .../java/org/apache/flink/metrics/Gauge.java | 34 -- .../org/apache/flink/metrics/Histogram.java | 52 -- .../flink/metrics/HistogramStatistics.java | 81 --- .../java/org/apache/flink/metrics/Metric.java | 28 -- .../org/apache/flink/metrics/MetricGroup.java | 133 ----- .../apache/flink/metrics/MetricRegistry.java | 273 ----------- .../org/apache/flink/metrics/SimpleCounter.java | 71 --- .../metrics/groups/AbstractMetricGroup.java | 292 ----------- .../metrics/groups/ComponentMetricGroup.java | 78 --- .../metrics/groups/GenericMetricGroup.java | 49 -- .../flink/metrics/groups/IOMetricGroup.java | 52 -- .../groups/JobManagerJobMetricGroup.java | 69 --- .../metrics/groups/JobManagerMetricGroup.java | 104 ---- .../flink/metrics/groups/JobMetricGroup.java | 62 --- .../metrics/groups/OperatorMetricGroup.java | 62 --- .../flink/metrics/groups/ProxyMetricGroup.java | 90 ---- .../groups/TaskManagerJobMetricGroup.java | 122 ----- .../metrics/groups/TaskManagerMetricGroup.java | 134 ----- .../flink/metrics/groups/TaskMetricGroup.java | 169 ------- .../groups/UnregisteredMetricsGroup.java | 84 ---- .../flink/metrics/groups/scope/ScopeFormat.java | 490 ------------------ .../metrics/groups/scope/ScopeFormats.java | 130 ----- .../metrics/reporter/AbstractReporter.java | 78 --- .../flink/metrics/reporter/JMXReporter.java | 491 ------------------- .../flink/metrics/reporter/MetricReporter.java | 75 --- .../flink/metrics/reporter/Scheduled.java | 34 -- .../flink/metrics/MetricRegistryTest.java | 194 -------- .../metrics/groups/JobManagerGroupTest.java | 108 ---- .../metrics/groups/JobManagerJobGroupTest.java | 90 ---- .../groups/MetricGroupRegistrationTest.java | 114 ----- .../flink/metrics/groups/MetricGroupTest.java | 149 ------ .../flink/metrics/groups/OperatorGroupTest.java | 53 -- .../metrics/groups/TaskManagerGroupTest.java | 154 ------ .../metrics/groups/TaskManagerJobGroupTest.java | 94 ---- .../metrics/groups/TaskMetricGroupTest.java | 167 ------- .../flink/metrics/reporter/JMXReporterTest.java | 291 ----------- .../apache/flink/metrics/util/TestReporter.java | 44 -- flink-dist/pom.xml | 7 + flink-metrics/flink-metrics-core/pom.xml | 35 ++ .../apache/flink/metrics/CharacterFilter.java | 36 ++ .../java/org/apache/flink/metrics/Counter.java | 56 +++ .../java/org/apache/flink/metrics/Gauge.java | 32 ++ .../org/apache/flink/metrics/Histogram.java | 49 ++ .../flink/metrics/HistogramStatistics.java | 78 +++ .../java/org/apache/flink/metrics/Metric.java | 25 + .../org/apache/flink/metrics/MetricConfig.java | 62 +++ .../org/apache/flink/metrics/MetricGroup.java | 162 ++++++ .../org/apache/flink/metrics/SimpleCounter.java | 73 +++ .../groups/UnregisteredMetricsGroup.java | 98 ++++ .../metrics/reporter/AbstractReporter.java | 76 +++ .../flink/metrics/reporter/MetricReporter.java | 73 +++ .../flink/metrics/reporter/Scheduled.java | 31 ++ flink-metrics/flink-metrics-dropwizard/pom.xml | 16 +- .../dropwizard/ScheduledDropwizardReporter.java | 12 +- .../ScheduledDropwizardReporterTest.java | 17 +- .../DropwizardFlinkHistogramWrapperTest.java | 7 +- flink-metrics/flink-metrics-ganglia/pom.xml | 2 +- .../flink/metrics/ganglia/GangliaReporter.java | 4 +- flink-metrics/flink-metrics-graphite/pom.xml | 2 +- .../metrics/graphite/GraphiteReporter.java | 4 +- flink-metrics/flink-metrics-jmx/pom.xml | 81 +++ .../apache/flink/metrics/jmx/JMXReporter.java | 491 +++++++++++++++++++ .../flink/metrics/jmx/JMXReporterTest.java | 289 +++++++++++ .../jobmanager/JMXJobManagerMetricTest.java | 119 +++++ .../src/test/resources/log4j-test.properties | 27 + .../src/test/resources/logback-test.xml | 34 ++ flink-metrics/flink-metrics-statsd/pom.xml | 9 +- .../flink/metrics/statsd/StatsDReporter.java | 4 +- .../metrics/statsd/StatsDReporterTest.java | 11 +- flink-metrics/pom.xml | 2 + .../flink/runtime/execution/Environment.java | 2 +- .../api/serialization/RecordSerializer.java | 2 +- .../serialization/SpanningRecordSerializer.java | 2 +- .../io/network/api/writer/RecordWriter.java | 2 +- .../partition/consumer/LocalInputChannel.java | 2 +- .../partition/consumer/RemoteInputChannel.java | 2 +- .../partition/consumer/SingleInputGate.java | 2 +- .../partition/consumer/UnknownInputChannel.java | 2 +- .../flink/runtime/metrics/MetricRegistry.java | 269 ++++++++++ .../metrics/groups/AbstractMetricGroup.java | 292 +++++++++++ .../metrics/groups/ComponentMetricGroup.java | 78 +++ .../metrics/groups/GenericMetricGroup.java | 47 ++ .../runtime/metrics/groups/IOMetricGroup.java | 52 ++ .../groups/JobManagerJobMetricGroup.java | 71 +++ .../metrics/groups/JobManagerMetricGroup.java | 103 ++++ .../runtime/metrics/groups/JobMetricGroup.java | 62 +++ .../metrics/groups/OperatorMetricGroup.java | 64 +++ .../metrics/groups/ProxyMetricGroup.java | 106 ++++ .../groups/TaskManagerJobMetricGroup.java | 125 +++++ .../metrics/groups/TaskManagerMetricGroup.java | 132 +++++ .../runtime/metrics/groups/TaskMetricGroup.java | 170 +++++++ .../metrics/scope/JobManagerJobScopeFormat.java | 46 ++ .../metrics/scope/JobManagerScopeFormat.java | 37 ++ .../metrics/scope/OperatorScopeFormat.java | 60 +++ .../runtime/metrics/scope/ScopeFormat.java | 307 ++++++++++++ .../runtime/metrics/scope/ScopeFormats.java | 153 ++++++ .../scope/TaskManagerJobScopeFormat.java | 48 ++ .../metrics/scope/TaskManagerScopeFormat.java | 38 ++ .../runtime/metrics/scope/TaskScopeFormat.java | 62 +++ .../runtime/taskmanager/RuntimeEnvironment.java | 2 +- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 11 +- .../testingUtils/TestingJobManager.scala | 2 +- .../ExecutionGraphMetricsTest.java | 12 +- .../jobmanager/JobManagerMetricTest.java | 118 ----- .../runtime/metrics/MetricRegistryTest.java | 197 ++++++++ .../metrics/groups/JobManagerGroupTest.java | 109 ++++ .../metrics/groups/JobManagerJobGroupTest.java | 90 ++++ .../groups/MetricGroupRegistrationTest.java | 114 +++++ .../runtime/metrics/groups/MetricGroupTest.java | 149 ++++++ .../metrics/groups/OperatorGroupTest.java | 53 ++ .../metrics/groups/TaskManagerGroupTest.java | 271 ++++++++++ .../metrics/groups/TaskManagerJobGroupTest.java | 94 ++++ .../metrics/groups/TaskMetricGroupTest.java | 168 +++++++ .../runtime/metrics/util/TestReporter.java | 44 ++ .../operators/testutils/DummyEnvironment.java | 2 +- .../operators/testutils/MockEnvironment.java | 4 +- .../testutils/UnregisteredTaskMetricsGroup.java | 10 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 2 +- .../flink/runtime/taskmanager/TaskStopTest.java | 3 +- .../flink/runtime/taskmanager/TaskTest.java | 2 +- .../flink-connector-kafka-0.8/pom.xml | 7 + .../flink-connector-kafka-0.9/pom.xml | 7 + .../flink-connector-kafka-base/pom.xml | 7 + .../connectors/kafka/KafkaTestBase.java | 3 +- .../runtime/io/StreamInputProcessor.java | 2 +- .../runtime/io/StreamTwoInputProcessor.java | 2 +- .../runtime/tasks/StreamMockEnvironment.java | 2 +- .../streaming/runtime/tasks/StreamTaskTest.java | 2 +- .../flink/yarn/TestingYarnJobManager.scala | 2 +- .../org/apache/flink/yarn/YarnJobManager.scala | 2 +- 136 files changed, 5677 insertions(+), 5088 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/docs/apis/metrics.md ---------------------------------------------------------------------- diff --git a/docs/apis/metrics.md b/docs/apis/metrics.md index 75e7a52..e8c2772 100644 --- a/docs/apis/metrics.md +++ b/docs/apis/metrics.md @@ -230,7 +230,7 @@ or by assigning unique names to jobs and operators. Metrics can be exposed to an external system by configuring a reporter in `conf/flink-conf.yaml`. - `metrics.reporter.class`: The class of the reporter to use. - - Example: org.apache.flink.metrics.reporter.JMXReporter + - Example: org.apache.flink.metrics.jmx.JMXReporter - `metrics.reporter.arguments`: A list of named parameters that are passed to the reporter. - Example: --host localhost --port 9010 - `metrics.reporter.interval`: The interval between reports. @@ -241,7 +241,7 @@ If the Reporter should send out reports regularly you have to implement the `Sch The following sections list the supported reporters. -### JMX (org.apache.flink.metrics.reporter.JMXReporter) +### JMX (org.apache.flink.metrics.jmx.JMXReporter) You don't have to include an additional dependency since the JMX reporter is available by default but not activated. http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 21d00b3..5496f8c 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -42,6 +42,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> <!-- managed version --> http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java b/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java deleted file mode 100644 index 1e9fbc4..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -/** - * Interface for a character filter function. The filter function is given a string which the filter - * can transform. The returned string is the transformation result. - */ -public interface CharacterFilter { - - /** - * Filter the given string and generate a resulting string from it. - * - * For example, one implementation could filter out invalid characters from the input string. - * - * @param input Input string - * @return Filtered result string - */ - String filterCharacters(String input); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java deleted file mode 100644 index ffb1cc7..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * A Counter is a {@link Metric} that measures a count. - */ -@PublicEvolving -public interface Counter extends Metric { - - /** - * Increment the current count by 1. - */ - void inc(); - - /** - * Increment the current count by the given value. - * - * @param n value to increment the current count by - */ - void inc(long n); - - /** - * Decrement the current count by 1. - */ - void dec(); - - /** - * Decrement the current count by the given value. - * - * @param n value to decrement the current count by - */ - void dec(long n); - - /** - * Returns the current count. - * - * @return current count - */ - long getCount(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java deleted file mode 100644 index 740645d..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * A Gauge is a {@link Metric} that calculates a specific value at a point in time. - */ -@PublicEvolving -public interface Gauge<T> extends Metric { - /** - * Calculates and returns the measured value. - * - * @return calculated value - */ - T getValue(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java deleted file mode 100644 index 3fd1253..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/Histogram.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * Histogram interface to be used with Flink's metrics system. - * - * The histogram allows to record values, get the current count of recorded values and create - * histogram statistics for the currently seen elements. - */ -@PublicEvolving -public interface Histogram extends Metric { - - /** - * Update the histogram with the given value. - * - * @param value Value to update the histogram with - */ - void update(long value); - - /** - * Get the count of seen elements. - * - * @return Count of seen elements - */ - long getCount(); - - /** - * Create statistics for the currently recorded elements. - * - * @return Statistics about the currently recorded elements - */ - HistogramStatistics getStatistics(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java deleted file mode 100644 index 476580c..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * Histogram statistics represent the current snapshot of elements recorded in the histogram. - * - * The histogram statistics allow to calculate values for quantiles, the mean, the standard - * deviation, the minimum and the maximum. - */ -@PublicEvolving -public abstract class HistogramStatistics { - - /** - * Returns the value for the given quantile based on the represented histogram statistics. - * - * @param quantile Quantile to calculate the value for - * @return Value for the given quantile - */ - public abstract double getQuantile(double quantile); - - /** - * Returns the elements of the statistics' sample - * - * @return Elements of the statistics' sample - */ - public abstract long[] getValues(); - - /** - * Returns the size of the statistics' sample - * - * @return Size of the statistics' sample - */ - public abstract int size(); - - /** - * Returns the mean of the histogram values. - * - * @return Mean of the histogram values - */ - public abstract double getMean(); - - /** - * Returns the standard deviation of the distribution reflected by the histogram statistics. - * - * @return Standard deviation of histogram distribution - */ - public abstract double getStdDev(); - - /** - * Returns the maximum value of the histogram. - * - * @return Maximum value of the histogram - */ - public abstract long getMax(); - - /** - * Returns the minimum value of the histogram. - * - * @return Minimum value of the histogram - */ - public abstract long getMin(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java deleted file mode 100644 index 8054de0..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * Common super interface for all metrics. - */ -@PublicEvolving -public interface Metric { -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java deleted file mode 100644 index b578cb3..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups. - * - * <p>Instances of this class can be used to register new metrics with Flink and to create a nested - * hierarchy based on the group names. - * - * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name. - */ -@PublicEvolving -public interface MetricGroup { - - // ------------------------------------------------------------------------ - // Metrics - // ------------------------------------------------------------------------ - - /** - * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. - * - * @param name name of the counter - * @return the created counter - */ - Counter counter(int name); - - /** - * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. - * - * @param name name of the counter - * @return the created counter - */ - Counter counter(String name); - - /** - * Registers a {@link org.apache.flink.metrics.Counter} with Flink. - * - * @param name name of the counter - * @param counter counter to register - * @param <C> counter type - * @return the given counter - */ - <C extends Counter> C counter(int name, C counter); - - /** - * Registers a {@link org.apache.flink.metrics.Counter} with Flink. - * - * @param name name of the counter - * @param counter counter to register - * @param <C> counter type - * @return the given counter - */ - <C extends Counter> C counter(String name, C counter); - - /** - * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. - * - * @param name name of the gauge - * @param gauge gauge to register - * @param <T> return type of the gauge - * @return the given gauge - */ - <T, G extends Gauge<T>> G gauge(int name, G gauge); - - /** - * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. - * - * @param name name of the gauge - * @param gauge gauge to register - * @param <T> return type of the gauge - * @return the given gauge - */ - <T, G extends Gauge<T>> G gauge(String name, G gauge); - - /** - * Registers a new {@link Histogram} with Flink. - * - * @param name name of the histogram - * @param histogram histogram to register - * @param <H> histogram type - * @return the registered histogram - */ - <H extends Histogram> H histogram(String name, H histogram); - - /** - * Registers a new {@link Histogram} with Flink. - * - * @param name name of the histogram - * @param histogram histogram to register - * @param <H> histogram type - * @return the registered histogram - */ - <H extends Histogram> H histogram(int name, H histogram); - - // ------------------------------------------------------------------------ - // Groups - // ------------------------------------------------------------------------ - - /** - * Creates a new MetricGroup and adds it to this groups sub-groups. - * - * @param name name of the group - * @return the created group - */ - MetricGroup addGroup(int name); - - /** - * Creates a new MetricGroup and adds it to this groups sub-groups. - * - * @param name name of the group - * @return the created group - */ - MetricGroup addGroup(String name); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java deleted file mode 100644 index 274821e..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.groups.scope.ScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormats; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.metrics.reporter.Scheduled; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.TimerTask; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the - * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. - */ -@Internal -public class MetricRegistry { - static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); - - private final MetricReporter reporter; - private final ScheduledExecutorService executor; - - private final ScopeFormats scopeFormats; - - private final char delimiter; - - /** - * Creates a new MetricRegistry and starts the configured reporter. - */ - public MetricRegistry(Configuration config) { - // first parse the scope formats, these are needed for all reporters - ScopeFormats scopeFormats; - try { - scopeFormats = createScopeConfig(config); - } - catch (Exception e) { - LOG.warn("Failed to parse scope format, using default scope formats", e); - scopeFormats = new ScopeFormats(); - } - this.scopeFormats = scopeFormats; - - char delim; - try { - delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0); - } catch (Exception e) { - LOG.warn("Failed to parse delimiter, using default delimiter.", e); - delim = '.'; - } - this.delimiter = delim; - - // second, instantiate any custom configured reporters - - final String className = config.getString(ConfigConstants.METRICS_REPORTER_CLASS, null); - if (className == null) { - // by default, don't report anything - LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); - this.reporter = null; - this.executor = null; - } - else { - MetricReporter reporter; - ScheduledExecutorService executor = null; - try { - String configuredPeriod = config.getString(ConfigConstants.METRICS_REPORTER_INTERVAL, null); - TimeUnit timeunit = TimeUnit.SECONDS; - long period = 10; - - if (configuredPeriod != null) { - try { - String[] interval = configuredPeriod.split(" "); - period = Long.parseLong(interval[0]); - timeunit = TimeUnit.valueOf(interval[1]); - } - catch (Exception e) { - LOG.error("Cannot parse report interval from config: " + configuredPeriod + - " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + - "Using default reporting interval."); - } - } - - Configuration reporterConfig = createReporterConfig(config, timeunit, period); - - Class<?> reporterClass = Class.forName(className); - reporter = (MetricReporter) reporterClass.newInstance(); - reporter.open(reporterConfig); - - if (reporter instanceof Scheduled) { - executor = Executors.newSingleThreadScheduledExecutor(); - LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - - executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); - } - } - catch (Throwable t) { - shutdownExecutor(); - LOG.error("Could not instantiate metrics reporter. No metrics will be exposed/reported.", t); - reporter = null; - } - - this.reporter = reporter; - this.executor = executor; - } - } - - public char getDelimiter() { - return this.delimiter; - } - - public MetricReporter getReporter() { - return reporter; - } - - /** - * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. - */ - public void shutdown() { - if (reporter != null) { - try { - reporter.close(); - } catch (Throwable t) { - LOG.warn("Metrics reporter did not shut down cleanly", t); - } - } - shutdownExecutor(); - } - - private void shutdownExecutor() { - if (executor != null) { - executor.shutdown(); - - try { - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - } - } - } - - public ScopeFormats getScopeFormats() { - return scopeFormats; - } - - // ------------------------------------------------------------------------ - // Metrics (de)registration - // ------------------------------------------------------------------------ - - /** - * Registers a new {@link Metric} with this registry. - * - * @param metric the metric that was added - * @param metricName the name of the metric - * @param group the group that contains the metric - */ - public void register(Metric metric, String metricName, AbstractMetricGroup group) { - try { - if (reporter != null) { - reporter.notifyOfAddedMetric(metric, metricName, group); - } - } catch (Exception e) { - LOG.error("Error while registering metric.", e); - } - } - - /** - * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry. - * - * @param metric the metric that should be removed - * @param metricName the name of the metric - * @param group the group that contains the metric - */ - public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { - try { - if (reporter != null) { - reporter.notifyOfRemovedMetric(metric, metricName, group); - } - } catch (Exception e) { - LOG.error("Error while registering metric.", e); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) { - Configuration reporterConfig = new Configuration(); - reporterConfig.setLong("period", period); - reporterConfig.setString("timeunit", timeunit.name()); - - String[] arguments = config.getString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "").split(" "); - if (arguments.length > 1) { - for (int x = 0; x < arguments.length; x += 2) { - reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]); - } - } - return reporterConfig; - } - - static ScopeFormats createScopeConfig(Configuration config) { - String jmFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); - String jmJobFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); - String tmFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - String tmJobFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); - String taskFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); - String operatorFormat = config.getString( - ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); - - return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); - } - - // ------------------------------------------------------------------------ - - /** - * This task is explicitly a static class, so that it does not hold any references to the enclosing - * MetricsRegistry instance. - * - * This is a subtle difference, but very important: With this static class, the enclosing class instance - * may become garbage-collectible, whereas with an anonymous inner class, the timer thread - * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer. - * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible, - * which acts as a fail-safe to stop the timer thread and prevents resource leaks. - */ - private static final class ReporterTask extends TimerTask { - - private final Scheduled reporter; - - private ReporterTask(Scheduled reporter) { - this.reporter = reporter; - } - - @Override - public void run() { - try { - reporter.report(); - } catch (Throwable t) { - LOG.warn("Error while reporting metrics", t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java deleted file mode 100644 index 9720b08..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics; - -/** - * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not thread-safe. - */ -public class SimpleCounter implements Counter { - private long count; - - /** - * Increment the current count by 1. - */ - @Override - public void inc() { - count++; - } - - /** - * Increment the current count by the given value. - * - * @param n value to increment the current count by - */ - @Override - public void inc(long n) { - count += n; - } - - /** - * Decrement the current count by 1. - */ - @Override - public void dec() { - count--; - } - - /** - * Decrement the current count by the given value. - * - * @param n value to decrement the current count by - */ - @Override - public void dec(long n) { - count -= n; - } - - /** - * Returns the current count. - * - * @return current count - */ - @Override - public long getCount() { - return count; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java deleted file mode 100644 index dda6e4d..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.CharacterFilter; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.SimpleCounter; - -import org.apache.flink.metrics.groups.scope.ScopeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups. - * - * <p><b>IMPORTANT IMPLEMENTATION NOTE</b> - * - * <p>This class uses locks for adding and removing metrics objects. This is done to - * prevent resource leaks in the presence of concurrently closing a group and adding - * metrics and subgroups. - * Since closing groups recursively closes the subgroups, the lock acquisition order must - * be strictly from parent group to subgroup. If at any point, a subgroup holds its group - * lock and calls a parent method that also acquires the lock, it will create a deadlock - * condition. - * - * <p>An AbstractMetricGroup can be {@link #close() closed}. Upon closing, the group de-register all metrics - * from any metrics reporter and any internal maps. Note that even closed metrics groups - * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code. - * These metrics simply do not get reported any more, when created on a closed group. - */ -@Internal -public abstract class AbstractMetricGroup implements MetricGroup { - - /** shared logger */ - private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); - - // ------------------------------------------------------------------------ - - /** The registry that this metrics group belongs to */ - protected final MetricRegistry registry; - - /** All metrics that are directly contained in this group */ - private final Map<String, Metric> metrics = new HashMap<>(); - - /** All metric subgroups of this group */ - private final Map<String, AbstractMetricGroup> groups = new HashMap<>(); - - /** The metrics scope represented by this group. - * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ - private final String[] scopeComponents; - - /** The metrics scope represented by this group, as a concatenated string, lazily computed. - * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ - private String scopeString; - - /** Flag indicating whether this group has been closed */ - private volatile boolean closed; - - // ------------------------------------------------------------------------ - - public AbstractMetricGroup(MetricRegistry registry, String[] scope) { - this.registry = checkNotNull(registry); - this.scopeComponents = checkNotNull(scope); - } - - /** - * Gets the scope as an array of the scope components, for example - * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} - */ - public String[] getScopeComponents() { - return scopeComponents; - } - - /** - * Returns the fully qualified metric name, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} - * - * @param metricName metric name - * @return fully qualified metric name - */ - public String getMetricIdentifier(String metricName) { - return getMetricIdentifier(metricName, null); - } - - /** - * Returns the fully qualified metric name, for example - * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} - * - * @param metricName metric name - * @param filter character filter which is applied to the scope components if not null. - * @return fully qualified metric name - */ - public String getMetricIdentifier(String metricName, CharacterFilter filter) { - if (scopeString == null) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); - } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); - } - } - - if (filter != null) { - return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName); - } else { - return scopeString + registry.getDelimiter() + metricName; - } - } - - // ------------------------------------------------------------------------ - // Closing - // ------------------------------------------------------------------------ - - public void close() { - synchronized (this) { - if (!closed) { - closed = true; - - // close all subgroups - for (AbstractMetricGroup group : groups.values()) { - group.close(); - } - groups.clear(); - - // un-register all directly contained metrics - for (Map.Entry<String, Metric> metric : metrics.entrySet()) { - registry.unregister(metric.getValue(), metric.getKey(), this); - } - metrics.clear(); - } - } - } - - public final boolean isClosed() { - return closed; - } - - // ----------------------------------------------------------------------------------------------------------------- - // Metrics - // ----------------------------------------------------------------------------------------------------------------- - - @Override - public Counter counter(int name) { - return counter(String.valueOf(name)); - } - - @Override - public Counter counter(String name) { - return counter(name, new SimpleCounter()); - } - - @Override - public <C extends Counter> C counter(int name, C counter) { - return counter(String.valueOf(name), counter); - } - - @Override - public <C extends Counter> C counter(String name, C counter) { - addMetric(name, counter); - return counter; - } - - @Override - public <T, G extends Gauge<T>> G gauge(int name, G gauge) { - return gauge(String.valueOf(name), gauge); - } - - @Override - public <T, G extends Gauge<T>> G gauge(String name, G gauge) { - addMetric(name, gauge); - return gauge; - } - - @Override - public <H extends Histogram> H histogram(int name, H histogram) { - return histogram(String.valueOf(name), histogram); - } - - @Override - public <H extends Histogram> H histogram(String name, H histogram) { - addMetric(name, histogram); - return histogram; - } - - /** - * Adds the given metric to the group and registers it at the registry, if the group - * is not yet closed, and if no metric with the same name has been registered before. - * - * @param name the name to register the metric under - * @param metric the metric to register - */ - protected void addMetric(String name, Metric metric) { - // add the metric only if the group is still open - synchronized (this) { - if (!closed) { - // immediately put without a 'contains' check to optimize the common case (no collition) - // collisions are resolved later - Metric prior = metrics.put(name, metric); - - // check for collisions with other metric names - if (prior == null) { - // no other metric with this name yet - - if (groups.containsKey(name)) { - // we warn here, rather than failing, because metrics are tools that should not fail the - // program when used incorrectly - LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); - } - - registry.register(metric, name, this); - } - else { - // we had a collision. put back the original value - metrics.put(name, prior); - - // we warn here, rather than failing, because metrics are tools that should not fail the - // program when used incorrectly - LOG.warn("Name collision: Group already contains a Metric with the name '" + - name + "'. Metric will not be reported. (" + scopeString + ')'); - } - } - } - } - - // ------------------------------------------------------------------------ - // Groups - // ------------------------------------------------------------------------ - - @Override - public MetricGroup addGroup(int name) { - return addGroup(String.valueOf(name)); - } - - @Override - public MetricGroup addGroup(String name) { - synchronized (this) { - if (!closed) { - // adding a group with the same name as a metric creates problems in many reporters/dashboards - // we warn here, rather than failing, because metrics are tools that should not fail the - // program when used incorrectly - if (metrics.containsKey(name)) { - LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); - } - - AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name); - AbstractMetricGroup prior = groups.put(name, newGroup); - if (prior == null) { - // no prior group with that name - return newGroup; - } else { - // had a prior group with that name, add the prior group back - groups.put(name, prior); - return prior; - } - } - else { - // return a non-registered group that is immediately closed already - GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name); - closedGroup.close(); - return closedGroup; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java deleted file mode 100644 index 518d940..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.MetricRegistry; - -/** - * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., - * TaskManager, Job, Task, Operator). - * - * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example - * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a - * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope. - * - * <p>Component groups, however, have configurable scopes. This allow users to include or exclude - * certain identifiers from the scope. The scope for metrics belonging to the "Task" - * group could for example include the task attempt number (more fine grained identification), or - * exclude it (for continuity of the namespace across failure and recovery). - */ -@Internal -public abstract class ComponentMetricGroup extends AbstractMetricGroup { - - /** - * Creates a new ComponentMetricGroup. - * - * @param registry registry to register new metrics with - * @param scope the scope of the group - */ - public ComponentMetricGroup( - MetricRegistry registry, - String[] scope) { - - super(registry, scope); - } - - /** - * Closes the component group by removing and closing all metrics and subgroups - * (inherited from {@link AbstractMetricGroup}), plus closing and removing all dedicated - * component subgroups. - */ - @Override - public void close() { - synchronized (this) { - if (!isClosed()) { - // remove all metrics and generic subgroups - super.close(); - - // remove and close all subcomponent metrics - for (ComponentMetricGroup group : subComponents()) { - group.close(); - } - } - } - } - - // ------------------------------------------------------------------------ - // sub components - // ------------------------------------------------------------------------ - - protected abstract Iterable<? extends ComponentMetricGroup> subComponents(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java deleted file mode 100644 index ddcd73b..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.MetricRegistry; - -/** - * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold - * subgroups of metrics. - */ -@Internal -public class GenericMetricGroup extends AbstractMetricGroup { - - public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { - super(registry, makeScopeComponents(parent, name)); - } - - // ------------------------------------------------------------------------ - - private static String[] makeScopeComponents(AbstractMetricGroup parent, String name) { - if (parent != null) { - String[] parentComponents = parent.getScopeComponents(); - if (parentComponents != null && parentComponents.length > 0) { - String[] parts = new String[parentComponents.length + 1]; - System.arraycopy(parentComponents, 0, parts, 0, parentComponents.length); - parts[parts.length - 1] = name; - return parts; - } - } - return new String[] { name }; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java deleted file mode 100644 index 90bc2a8..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.metrics.Counter; - -/** - * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is - * forwarded to the parent task metric group. - */ -public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { - - private final Counter numBytesOut; - private final Counter numBytesInLocal; - private final Counter numBytesInRemote; - - public IOMetricGroup(TaskMetricGroup parent) { - super(parent); - - this.numBytesOut = counter("numBytesOut"); - this.numBytesInLocal = counter("numBytesInLocal"); - this.numBytesInRemote = counter("numBytesInRemote"); - } - - public Counter getBytesOutCounter() { - return numBytesOut; - } - - public Counter getNumBytesInLocalCounter() { - return numBytesInLocal; - } - - public Counter getNumBytesInRemoteCounter() { - return numBytesInRemote; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java deleted file mode 100644 index 1dd0439..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; - -import javax.annotation.Nullable; -import java.util.Collections; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to - * a specific job, running on the JobManager. - */ -@Internal -public class JobManagerJobMetricGroup extends JobMetricGroup { - - /** The metrics group that contains this group */ - private final JobManagerMetricGroup parent; - - public JobManagerJobMetricGroup( - MetricRegistry registry, - JobManagerMetricGroup parent, - JobID jobId, - @Nullable String jobName) { - - this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName); - } - - public JobManagerJobMetricGroup( - MetricRegistry registry, - JobManagerMetricGroup parent, - JobManagerJobScopeFormat scopeFormat, - JobID jobId, - @Nullable String jobName) { - - super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); - - this.parent = checkNotNull(parent); - } - - public final JobManagerMetricGroup parent() { - return parent; - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java deleted file mode 100644 index 67e1117..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; - -import java.util.HashMap; -import java.util.Map; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager. - * - * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do - * not contain tasks any more - */ -@Internal -public class JobManagerMetricGroup extends ComponentMetricGroup { - - private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>(); - - private final String hostname; - - public JobManagerMetricGroup(MetricRegistry registry, String hostname) { - this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname); - } - - public JobManagerMetricGroup( - MetricRegistry registry, - JobManagerScopeFormat scopeFormat, - String hostname) { - - super(registry, scopeFormat.formatScope(hostname)); - this.hostname = hostname; - } - - public String hostname() { - return hostname; - } - - // ------------------------------------------------------------------------ - // job groups - // ------------------------------------------------------------------------ - - public JobManagerJobMetricGroup addJob( - JobID jobId, - String jobName) { - // get or create a jobs metric group - JobManagerJobMetricGroup currentJobGroup; - synchronized (this) { - if (!isClosed()) { - currentJobGroup = jobs.get(jobId); - - if (currentJobGroup == null || currentJobGroup.isClosed()) { - currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); - jobs.put(jobId, currentJobGroup); - } - return currentJobGroup; - } else { - return null; - } - } - } - - public void removeJob(JobID jobId) { - if (jobId == null) { - return; - } - - synchronized (this) { - JobManagerJobMetricGroup containedGroup = jobs.remove(jobId); - if (containedGroup != null) { - containedGroup.close(); - } - } - } - - public int numRegisteredJobMetricGroups() { - return jobs.size(); - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return jobs.values(); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java deleted file mode 100644 index f7dfc78..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.MetricRegistry; - -import javax.annotation.Nullable; - -/** - * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to - * a specific job. - */ -@Internal -public abstract class JobMetricGroup extends ComponentMetricGroup { - - /** The ID of the job represented by this metrics group */ - protected final JobID jobId; - - /** The name of the job represented by this metrics group */ - @Nullable - protected final String jobName; - - // ------------------------------------------------------------------------ - - protected JobMetricGroup( - MetricRegistry registry, - JobID jobId, - @Nullable String jobName, - String[] scope) { - super(registry, scope); - - this.jobId = jobId; - this.jobName = jobName; - } - - public JobID jobId() { - return jobId; - } - - @Nullable - public String jobName() { - return jobName; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java deleted file mode 100644 index 6db79ab..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat; - -import java.util.Collections; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator. - */ -@Internal -public class OperatorMetricGroup extends ComponentMetricGroup { - - /** The task metric group that contains this operator metric groups */ - private final TaskMetricGroup parent; - - public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) { - this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName); - } - - public OperatorMetricGroup( - MetricRegistry registry, - TaskMetricGroup parent, - OperatorScopeFormat scopeFormat, - String operatorName) { - - super(registry, scopeFormat.formatScope(parent, operatorName)); - this.parent = checkNotNull(parent); - } - - // ------------------------------------------------------------------------ - - public final TaskMetricGroup parent() { - return parent; - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java deleted file mode 100644 index 14ff367..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.util.Preconditions; - -/** - * Metric group which forwards all registration calls to its parent metric group. - * - * @param <P> Type of the parent metric group - */ -@Internal -public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { - private final P parentMetricGroup; - - public ProxyMetricGroup(P parentMetricGroup) { - this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); - } - - @Override - public final Counter counter(int name) { - return parentMetricGroup.counter(name); - } - - @Override - public final Counter counter(String name) { - return parentMetricGroup.counter(name); - } - - @Override - public final <C extends Counter> C counter(int name, C counter) { - return parentMetricGroup.counter(name, counter); - } - - @Override - public final <C extends Counter> C counter(String name, C counter) { - return parentMetricGroup.counter(name, counter); - } - - @Override - public final <T, G extends Gauge<T>> G gauge(int name, G gauge) { - return parentMetricGroup.gauge(name, gauge); - } - - @Override - public final <T, G extends Gauge<T>> G gauge(String name, G gauge) { - return parentMetricGroup.gauge(name, gauge); - } - - @Override - public final <H extends Histogram> H histogram(String name, H histogram) { - return parentMetricGroup.histogram(name, histogram); - } - - @Override - public final <H extends Histogram> H histogram(int name, H histogram) { - return parentMetricGroup.histogram(name, histogram); - } - - @Override - public final MetricGroup addGroup(int name) { - return parentMetricGroup.addGroup(name); - } - - @Override - public final MetricGroup addGroup(String name) { - return parentMetricGroup.addGroup(name); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java deleted file mode 100644 index fdaf1de..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.util.AbstractID; - -import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to - * a specific job, running on the TaskManager. - * - * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}). - */ -@Internal -public class TaskManagerJobMetricGroup extends JobMetricGroup { - - /** The metrics group that contains this group */ - private final TaskManagerMetricGroup parent; - - /** Map from execution attempt ID (task identifier) to task metrics */ - private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); - - // ------------------------------------------------------------------------ - - public TaskManagerJobMetricGroup( - MetricRegistry registry, - TaskManagerMetricGroup parent, - JobID jobId, - @Nullable String jobName) { - - this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName); - } - - public TaskManagerJobMetricGroup( - MetricRegistry registry, - TaskManagerMetricGroup parent, - TaskManagerJobScopeFormat scopeFormat, - JobID jobId, - @Nullable String jobName) { - - super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); - - this.parent = checkNotNull(parent); - } - - public final TaskManagerMetricGroup parent() { - return parent; - } - - // ------------------------------------------------------------------------ - // adding / removing tasks - // ------------------------------------------------------------------------ - - public TaskMetricGroup addTask( - AbstractID vertexId, - AbstractID executionId, - String taskName, - int subtaskIndex, - int attemptNumber) { - - checkNotNull(executionId); - - synchronized (this) { - if (!isClosed()) { - TaskMetricGroup task = new TaskMetricGroup(registry, this, - vertexId, executionId, taskName, subtaskIndex, attemptNumber); - tasks.put(executionId, task); - return task; - } else { - return null; - } - } - } - - public void removeTaskMetricGroup(AbstractID executionId) { - checkNotNull(executionId); - - boolean removeFromParent = false; - synchronized (this) { - if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) { - // this call removed the last task. close this group. - removeFromParent = true; - close(); - } - } - - // IMPORTANT: removing from the parent must not happen while holding the this group's lock, - // because it would violate the "first parent then subgroup" lock acquisition order - if (removeFromParent) { - parent.removeJobMetricsGroup(jobId, this); - } - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return tasks.values(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java deleted file mode 100644 index 2b2b201..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; -import org.apache.flink.util.AbstractID; - -import java.util.HashMap; -import java.util.Map; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager. - * - * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do - * not contain tasks any more - */ -@Internal -public class TaskManagerMetricGroup extends ComponentMetricGroup { - - private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>(); - - private final String hostname; - - private final String taskManagerId; - - - public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) { - this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId); - } - - public TaskManagerMetricGroup( - MetricRegistry registry, - TaskManagerScopeFormat scopeFormat, - String hostname, String taskManagerId) { - - super(registry, scopeFormat.formatScope(hostname, taskManagerId)); - this.hostname = hostname; - this.taskManagerId = taskManagerId; - } - - public String hostname() { - return hostname; - } - - public String taskManagerId() { - return taskManagerId; - } - - // ------------------------------------------------------------------------ - // job groups - // ------------------------------------------------------------------------ - - public TaskMetricGroup addTaskForJob( - JobID jobId, - String jobName, - AbstractID vertexID, - AbstractID executionId, - String taskName, - int subtaskIndex, - int attemptNumber) { - - // we cannot strictly lock both our map modification and the job group modification - // because it might lead to a deadlock - while (true) { - // get or create a jobs metric group - TaskManagerJobMetricGroup currentJobGroup; - synchronized (this) { - currentJobGroup = jobs.get(jobId); - - if (currentJobGroup == null || currentJobGroup.isClosed()) { - currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName); - jobs.put(jobId, currentJobGroup); - } - } - - // try to add another task. this may fail if we found a pre-existing job metrics - // group and it is closed concurrently - TaskMetricGroup taskGroup = currentJobGroup.addTask( - vertexID, executionId, taskName, subtaskIndex, attemptNumber); - - if (taskGroup != null) { - // successfully added the next task - return taskGroup; - } - - // else fall through the loop - } - } - - public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) { - if (jobId == null || group == null || !group.isClosed()) { - return; - } - - synchronized (this) { - // optimistically remove the currently contained group, and check later if it was correct - TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId); - - // check if another group was actually contained, and restore that one - if (containedGroup != null && containedGroup != group) { - jobs.put(jobId, containedGroup); - } - } - } - - public int numRegisteredJobMetricGroups() { - return jobs.size(); - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return jobs.values(); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java deleted file mode 100644 index 5849b4b..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; -import org.apache.flink.util.AbstractID; - -import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task. - * - * <p>Contains extra logic for adding operators. - */ -@Internal -public class TaskMetricGroup extends ComponentMetricGroup { - - /** The job metrics group containing this task metrics group */ - private final TaskManagerJobMetricGroup parent; - - private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); - - private final IOMetricGroup ioMetrics; - - /** The execution Id uniquely identifying the executed task represented by this metrics group */ - private final AbstractID executionId; - - @Nullable - private final AbstractID vertexId; - - @Nullable - private final String taskName; - - private final int subtaskIndex; - - private final int attemptNumber; - - // ------------------------------------------------------------------------ - - public TaskMetricGroup( - MetricRegistry registry, - TaskManagerJobMetricGroup parent, - @Nullable AbstractID vertexId, - AbstractID executionId, - @Nullable String taskName, - int subtaskIndex, - int attemptNumber) { - - this(registry, parent, registry.getScopeFormats().getTaskFormat(), - vertexId, executionId, taskName, subtaskIndex, attemptNumber); - } - - public TaskMetricGroup( - MetricRegistry registry, - TaskManagerJobMetricGroup parent, - TaskScopeFormat scopeFormat, - @Nullable AbstractID vertexId, - AbstractID executionId, - @Nullable String taskName, - int subtaskIndex, - int attemptNumber) { - - super(registry, scopeFormat.formatScope( - parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber)); - - this.parent = checkNotNull(parent); - this.executionId = checkNotNull(executionId); - this.vertexId = vertexId; - this.taskName = taskName; - this.subtaskIndex = subtaskIndex; - this.attemptNumber = attemptNumber; - - this.ioMetrics = new IOMetricGroup(this); - } - - // ------------------------------------------------------------------------ - // properties - // ------------------------------------------------------------------------ - - public final TaskManagerJobMetricGroup parent() { - return parent; - } - - public AbstractID executionId() { - return executionId; - } - - @Nullable - public AbstractID vertexId() { - return vertexId; - } - - @Nullable - public String taskName() { - return taskName; - } - - public int subtaskIndex() { - return subtaskIndex; - } - - public int attemptNumber() { - return attemptNumber; - } - - /** - * Returns the IOMetricGroup for this task. - * - * @return IOMetricGroup for this task. - */ - public IOMetricGroup getIOMetricGroup() { - return ioMetrics; - } - - // ------------------------------------------------------------------------ - // operators and cleanup - // ------------------------------------------------------------------------ - public OperatorMetricGroup addOperator(String name) { - OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name); - - synchronized (this) { - OperatorMetricGroup previous = operators.put(name, operator); - if (previous == null) { - // no operator group so far - return operator; - } else { - // already had an operator group. restore that one. - operators.put(name, previous); - return previous; - } - } - } - - @Override - public void close() { - super.close(); - - parent.removeTaskMetricGroup(executionId); - } - - // ------------------------------------------------------------------------ - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return operators.values(); - } -}
