[FLINK-1502] [core] Cleanups, robustness, and performance improvements in the metrics system
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad8375a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad8375a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad8375a Branch: refs/heads/master Commit: 7ad8375a89374bec80571029e9166f1336bdea8e Parents: d3e3bd5 Author: Stephan Ewen <se...@apache.org> Authored: Wed May 25 20:34:44 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon May 30 14:54:10 2016 +0200 ---------------------------------------------------------------------- flink-contrib/flink-storm/pom.xml | 22 +- .../flink/storm/wrappers/BoltWrapperTest.java | 13 +- .../common/operators/CollectionExecutor.java | 32 +- .../java/org/apache/flink/metrics/Counter.java | 5 +- .../java/org/apache/flink/metrics/Gauge.java | 3 +- .../java/org/apache/flink/metrics/Metric.java | 3 +- .../apache/flink/metrics/MetricRegistry.java | 92 ++--- .../metrics/groups/AbstractMetricGroup.java | 98 +++-- .../metrics/groups/ComponentMetricGroup.java | 109 +---- .../metrics/groups/GenericMetricGroup.java | 45 +-- .../flink/metrics/groups/IOMetricGroup.java | 27 +- .../flink/metrics/groups/JobMetricGroup.java | 88 ++-- .../groups/NonRegisteringMetricsGroup.java | 87 ---- .../metrics/groups/OperatorMetricGroup.java | 35 +- .../org/apache/flink/metrics/groups/Scope.java | 119 ------ .../metrics/groups/TaskManagerMetricGroup.java | 68 ++-- .../flink/metrics/groups/TaskMetricGroup.java | 126 ++++-- .../groups/UnregisteredMetricsGroup.java | 73 ++++ .../flink/metrics/groups/scope/ScopeFormat.java | 399 +++++++++++++++++++ .../metrics/groups/scope/ScopeFormats.java | 105 +++++ .../metrics/reporter/AbstractReporter.java | 40 +- .../flink/metrics/reporter/JMXReporter.java | 194 ++++++--- .../flink/metrics/reporter/MetricReporter.java | 53 +-- .../flink/metrics/reporter/Scheduled.java | 8 +- .../functions/util/RuntimeUDFContextTest.java | 33 +- .../api/common/io/RichInputFormatTest.java | 18 +- .../api/common/io/RichOutputFormatTest.java | 19 +- .../operators/GenericDataSinkBaseTest.java | 16 +- .../operators/GenericDataSourceBaseTest.java | 18 +- .../base/FlatMapOperatorCollectionTest.java | 9 +- .../base/InnerJoinOperatorBaseTest.java | 15 +- .../common/operators/base/MapOperatorTest.java | 36 +- .../base/PartitionMapOperatorTest.java | 19 +- .../flink/metrics/MetricRegistryTest.java | 78 +--- .../flink/metrics/groups/JobGroupTest.java | 73 ++-- .../groups/MetricGroupRegistrationTest.java | 7 +- .../flink/metrics/groups/MetricGroupTest.java | 32 +- .../flink/metrics/groups/OperatorGroupTest.java | 66 +-- .../flink/metrics/groups/TaskGroupTest.java | 103 +++-- .../metrics/groups/TaskManagerGroupTest.java | 58 +-- .../flink/metrics/reporter/JMXReporterTest.java | 35 +- .../flink/metrics/util/DummyJobMetricGroup.java | 47 --- .../flink/metrics/util/DummyMetricGroup.java | 57 --- .../flink/metrics/util/DummyMetricRegistry.java | 29 -- .../metrics/util/DummyOperatorMetricGroup.java | 37 -- .../flink/metrics/util/DummyReporter.java | 47 --- .../util/DummyTaskManagerMetricGroup.java | 42 -- .../metrics/util/DummyTaskMetricGroup.java | 42 -- .../apache/flink/metrics/util/TestReporter.java | 11 +- .../base/CoGroupOperatorCollectionTest.java | 6 +- .../operators/base/GroupReduceOperatorTest.java | 17 +- .../base/InnerJoinOperatorBaseTest.java | 24 +- .../operators/base/ReduceOperatorTest.java | 23 +- .../dropwizard/ScheduledDropwizardReporter.java | 104 +++-- .../flink/dropwizard/metrics/GaugeWrapper.java | 8 + .../flink/metrics/ganglia/GangliaReporter.java | 79 ++++ .../flink/metrics/graphite/GangliaReporter.java | 73 ---- .../metrics/graphite/GraphiteReporter.java | 17 +- .../flink/metrics/statsd/StatsDReporter.java | 79 ++-- .../flink/runtime/taskmanager/TaskManager.scala | 3 +- .../operators/drivers/TestTaskContext.java | 4 +- .../testutils/BinaryOperatorTestBase.java | 6 +- .../operators/testutils/DriverTestBase.java | 31 +- .../operators/testutils/DummyEnvironment.java | 28 +- .../operators/testutils/MockEnvironment.java | 3 +- .../testutils/UnregisteredTaskMetricsGroup.java | 68 ++++ ...AlignedProcessingTimeWindowOperatorTest.java | 20 +- ...AlignedProcessingTimeWindowOperatorTest.java | 14 +- .../runtime/tasks/StreamMockEnvironment.java | 4 +- 69 files changed, 1893 insertions(+), 1509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index a080a03..590f33d 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -35,6 +35,9 @@ under the License. <packaging>jar</packaging> <dependencies> + + <!-- core dependencies --> + <dependency> <!-- Together with the dependency management section in flink-parent, this pins the Kryo version of transitive dependencies to the Flink Kryo version --> @@ -49,14 +52,6 @@ under the License. </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> @@ -77,6 +72,17 @@ under the License. <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index cb9ac1c..5da12ef 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -32,10 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.metrics.util.DummyMetricGroup; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.taskmanager.RuntimeEnvironment; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; @@ -144,7 +143,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); when(taskContext.getTaskName()).thenReturn("name"); - when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); + when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup()); final IRichBolt bolt = mock(IRichBolt.class); @@ -229,7 +228,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); when(taskContext.getTaskName()).thenReturn("name"); - when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); + when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup()); final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); @@ -294,7 +293,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); when(taskContext.getTaskName()).thenReturn("name"); - when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); + when(taskContext.getMetricGroup()).thenReturn(new UnregisteredMetricsGroup()); final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); @@ -367,7 +366,7 @@ public class BoltWrapperTest extends AbstractTest { Environment env = mock(Environment.class); when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); - when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup()); + when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); StreamTask<?, ?> mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index 97f0c5e..913b205 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -36,7 +36,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; @@ -62,7 +61,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.NonRegisteringMetricsGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.types.Value; import org.apache.flink.util.Visitor; @@ -72,8 +71,6 @@ import org.apache.flink.util.Visitor; @Internal public class CollectionExecutor { - private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true; - private final Map<Operator<?>, List<?>> intermediateResults; private final Map<String, Accumulator<?, ?>> accumulators; @@ -109,11 +106,6 @@ public class CollectionExecutor { public JobExecutionResult execute(Plan program) throws Exception { long startTime = System.currentTimeMillis(); - - JobID jobID = program.getJobId(); - if (jobID == null) { - jobID = new JobID(); - } initCache(program.getCachedFiles()); Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks(); @@ -194,7 +186,7 @@ public class CollectionExecutor { TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0); RuntimeUDFContext ctx; - MetricGroup metrics = NonRegisteringMetricsGroup.get(); + MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : @@ -215,7 +207,7 @@ public class CollectionExecutor { RuntimeUDFContext ctx; - MetricGroup metrics = NonRegisteringMetricsGroup.get(); + MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); @@ -241,7 +233,7 @@ public class CollectionExecutor { TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; - MetricGroup metrics = NonRegisteringMetricsGroup.get(); + MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); @@ -253,10 +245,8 @@ public class CollectionExecutor { } else { ctx = null; } - - List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig); - - return result; + + return typedOp.executeOnCollections(inputData, ctx, executionConfig); } private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?, ?, ?> operator, int superStep) throws Exception { @@ -283,7 +273,7 @@ public class CollectionExecutor { TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; - MetricGroup metrics = NonRegisteringMetricsGroup.get(); + MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : @@ -296,10 +286,8 @@ public class CollectionExecutor { } else { ctx = null; } - - List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig); - - return result; + + return typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig); } @SuppressWarnings("unchecked") @@ -448,7 +436,7 @@ public class CollectionExecutor { solutionMap.put(wrapper, delta); } - currentWorkset = (List<?>) execute(iteration.getNextWorkset(), superstep); + currentWorkset = execute(iteration.getNextWorkset(), superstep); if (currentWorkset.isEmpty()) { break; http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java index 201a613..acc37cf 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics; import org.apache.flink.annotation.PublicEvolving; @@ -24,9 +25,9 @@ import org.apache.flink.annotation.PublicEvolving; */ @PublicEvolving public final class Counter implements Metric { - + private long count; - + /** * Increment the current count by 1. */ http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java index cca105e..aad8deb 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics; import org.apache.flink.annotation.PublicEvolving; @@ -24,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving; */ @PublicEvolving public abstract class Gauge<T> implements Metric { - + /** * Calculates and returns the measured value. * http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java index 11cfcc6..8054de0 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java @@ -15,12 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics; import org.apache.flink.annotation.PublicEvolving; /** - * Common interface for all metrics. + * Common super interface for all metrics. */ @PublicEvolving public interface Metric { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index 7e06217..e5d3477 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -21,7 +21,8 @@ package org.apache.flink.metrics; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.groups.Scope; +import org.apache.flink.metrics.groups.scope.ScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormats; import org.apache.flink.metrics.reporter.JMXReporter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; @@ -32,11 +33,6 @@ import org.slf4j.LoggerFactory; import java.util.TimerTask; import java.util.concurrent.TimeUnit; -import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; -import static org.apache.flink.metrics.groups.OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR; -import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM; -import static org.apache.flink.metrics.groups.TaskMetricGroup.DEFAULT_SCOPE_TASK; - /** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. @@ -52,10 +48,10 @@ public class MetricRegistry { public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; - public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm"; - public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job"; - public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task"; - public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; + public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scopeName.tm"; + public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scopeName.job"; + public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scopeName.task"; + public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scopeName.operator"; // ------------------------------------------------------------------------ // configuration keys @@ -66,23 +62,22 @@ public class MetricRegistry { private final MetricReporter reporter; private final java.util.Timer timer; - private final Scope.ScopeFormat scopeConfig; + private final ScopeFormats scopeFormats; /** * Creates a new MetricRegistry and starts the configured reporter. */ public MetricRegistry(Configuration config) { - // first parse the scope formats, these are needed for all reporters - - Scope.ScopeFormat scopeFormat; + // first parse the scopeName formats, these are needed for all reporters + ScopeFormats scopeFormats; try { - scopeFormat = createScopeConfig(config); + scopeFormats = createScopeConfig(config); } catch (Exception e) { - scopeFormat = createScopeConfig(new Configuration()); - LOG.warn("Failed to parse scope format, using default scope formats", e); + LOG.warn("Failed to parse scopeName format, using default scopeName formats", e); + scopeFormats = new ScopeFormats(); } - this.scopeConfig = scopeFormat; + this.scopeFormats = scopeFormats; // second, instantiate any custom configured reporters @@ -157,8 +152,8 @@ public class MetricRegistry { } } - public Scope.ScopeFormat getScopeConfig() { - return this.scopeConfig; + public ScopeFormats getScopeFormats() { + return scopeFormats; } // ------------------------------------------------------------------------ @@ -166,35 +161,36 @@ public class MetricRegistry { // ------------------------------------------------------------------------ /** - * Registers a new {@link org.apache.flink.metrics.Metric} with this registry. + * Registers a new {@link Metric} with this registry. * - * @param metric metric to register - * @param name name of the metric - * @param parent group that contains the metric + * @param metric the metric that was added + * @param metricName the name of the metric + * @param group the group that contains the metric */ - public void register(Metric metric, String name, AbstractMetricGroup parent) { - String metricName = reporter.generateName(name, parent.generateScope()); - this.reporter.notifyOfAddedMetric(metric, metricName); + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + if (reporter != null) { + reporter.notifyOfAddedMetric(metric, metricName, group); + } } /** * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry. * - * @param metric metric to un-register - * @param name name of the metric - * @param parent group that contains the metric + * @param metric the metric that should be removed + * @param metricName the name of the metric + * @param group the group that contains the metric */ - public void unregister(Metric metric, String name, AbstractMetricGroup parent) { - String metricName = reporter.generateName(name, parent.generateScope()); - - this.reporter.notifyOfRemovedMetric(metric, metricName); + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { + if (reporter != null) { + reporter.notifyOfRemovedMetric(metric, metricName, group); + } } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - private static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) { + static Configuration createReporterConfig(Configuration config, TimeUnit timeunit, long period) { Configuration reporterConfig = new Configuration(); reporterConfig.setLong("period", period); reporterConfig.setString("timeunit", timeunit.name()); @@ -208,19 +204,17 @@ public class MetricRegistry { return reporterConfig; } - private static Scope.ScopeFormat createScopeConfig(Configuration config) { - String tmFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TM, DEFAULT_SCOPE_TM); - String jobFormat = config.getString(KEY_METRICS_SCOPE_NAMING_JOB, DEFAULT_SCOPE_JOB); - String taskFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TASK, DEFAULT_SCOPE_TASK); - String operatorFormat = config.getString(KEY_METRICS_SCOPE_NAMING_OPERATOR, DEFAULT_SCOPE_OPERATOR); - - - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setTaskManagerFormat(tmFormat); - format.setJobFormat(jobFormat); - format.setTaskFormat(taskFormat); - format.setOperatorFormat(operatorFormat); - return format; + static ScopeFormats createScopeConfig(Configuration config) { + String tmFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); + String jobFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); + String taskFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); + String operatorFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); + + return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat); } // ------------------------------------------------------------------------ @@ -236,7 +230,7 @@ public class MetricRegistry { * which acts as a fail-safe to stop the timer thread and prevents resource leaks. */ private static final class ReporterTask extends TimerTask { - + private final Scheduled reporter; private ReporterTask(Scheduled reporter) { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java index cad241d..032fa04 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -25,14 +25,12 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -55,31 +53,58 @@ public abstract class AbstractMetricGroup implements MetricGroup { /** shared logger */ private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); - private static final String METRIC_NAME_REGEX = "[a-zA-Z0-9]*"; - - /** The pattern that metric and group names have to match */ - private static final Pattern METRIC_NAME_PATTERN = Pattern.compile(METRIC_NAME_REGEX); - // ------------------------------------------------------------------------ /** The registry that this metrics group belongs to */ protected final MetricRegistry registry; /** All metrics that are directly contained in this group */ - protected final Map<String, Metric> metrics = new HashMap<>(); + private final Map<String, Metric> metrics = new HashMap<>(); /** All metric subgroups of this group */ - protected final Map<String, MetricGroup> groups = new HashMap<>(); + private final Map<String, MetricGroup> groups = new HashMap<>(); + + /** The metrics scope represented by this group. + * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ + private final String[] scopeComponents; + + /** The metrics scope represented by this group, as a concatenated string, lazily computed. + * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ + private String scopeString; /** Flag indicating whether this group has been closed */ private volatile boolean closed; // ------------------------------------------------------------------------ - - public AbstractMetricGroup(MetricRegistry registry) { + + public AbstractMetricGroup(MetricRegistry registry, String[] scope) { this.registry = checkNotNull(registry); + this.scopeComponents = checkNotNull(scope); } + /** + * Gets the scope as an array of the scope components, for example + * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]} + * + * @see #getScopeString() + */ + public String[] getScopeComponents() { + return scopeComponents; + } + + /** + * Gets the scope as a single delimited string, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper"} + * + * @see #getScopeComponents() + */ + public String getScopeString() { + if (scopeString == null) { + scopeString = ScopeFormat.concat(scopeComponents); + } + return scopeString; + } + // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ @@ -111,25 +136,6 @@ public abstract class AbstractMetricGroup implements MetricGroup { } // ----------------------------------------------------------------------------------------------------------------- - // Scope - // ----------------------------------------------------------------------------------------------------------------- - - /** - * Generates the full scope based on the default/configured format that applies to all metrics within this group. - * - * @return generated scope - */ - public abstract List<String> generateScope(); - - /** - * Generates the full scope based on the given format that applies to all metrics within this group. - * - * @param format format string - * @return generated scope - */ - public abstract List<String> generateScope(Scope.ScopeFormat format); - - // ----------------------------------------------------------------------------------------------------------------- // Metrics // ----------------------------------------------------------------------------------------------------------------- @@ -164,11 +170,8 @@ public abstract class AbstractMetricGroup implements MetricGroup { * @param metric the metric to register */ protected void addMetric(String name, Metric metric) { - Matcher nameMatcher = METRIC_NAME_PATTERN.matcher(name); - if (!nameMatcher.matches()) { - throw new IllegalArgumentException("Metric names may not contain special characters or spaces. " + - "Allowed is: " + METRIC_NAME_REGEX); - } + // early reject names that will later cause issues + checkAllowedCharacters(name); // add the metric only if the group is still open synchronized (this) { @@ -185,7 +188,7 @@ public abstract class AbstractMetricGroup implements MetricGroup { // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" + - name + "'. Metric might not get properly reported. (" + generateScope() + ')'); + name + "'. Metric might not get properly reported. (" + scopeString + ')'); } registry.register(metric, name, this); @@ -197,7 +200,7 @@ public abstract class AbstractMetricGroup implements MetricGroup { // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Group already contains a Metric with the name '" + - name + "'. Metric will not be reported. (" + generateScope() + ')'); + name + "'. Metric will not be reported. (" + scopeString + ')'); } } } @@ -221,7 +224,7 @@ public abstract class AbstractMetricGroup implements MetricGroup { // program when used incorrectly if (metrics.containsKey(name)) { LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + - name + "'. Metric might not get properly reported. (" + generateScope() + ')'); + name + "'. Metric might not get properly reported. (" + scopeString + ')'); } MetricGroup newGroup = new GenericMetricGroup(registry, this, name); @@ -243,4 +246,21 @@ public abstract class AbstractMetricGroup implements MetricGroup { } } } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Fast implementation to check if a string has only alphanumeric characters. + * Compared to a regular expression, this is about an order of magnitude faster. + */ + private static void checkAllowedCharacters(String name) { + for (int i = 0; i < name.length(); i++) { + final char c = name.charAt(i); + if (c < 0x30 || (c >= 0x3a && c <= 0x40) || (c > 0x5a && c <= 0x60) || c > 0x7a) { + throw new IllegalArgumentException("Metric names may only contain [a-zA-Z0-9]."); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java index c68cdc1..518d940 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java @@ -21,46 +21,33 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.MetricRegistry; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.metrics.groups.Scope.SCOPE_WILDCARD; - /** * Abstract {@link org.apache.flink.metrics.MetricGroup} for system components (e.g., * TaskManager, Job, Task, Operator). * - * <p>The components metric groups contain functionality to define alias names for - * the component identifier. For example, while Tasks are registered under a Task Attempt ID, - * the metrics name scope may use the task name instead. Using these aliases makes - * Metric scope names stable across jobs, recovery attempts, etc. + * <p>Usually, the scope of metrics is simply the hierarchy of the containing groups. For example + * the Metric {@code "MyMetric"} in group {@code "B"} nested in group {@code "A"} would have a + * fully scoped name of {@code "A.B.MyMetric"}, with {@code "A.B"} being the Metric's scope. + * + * <p>Component groups, however, have configurable scopes. This allow users to include or exclude + * certain identifiers from the scope. The scope for metrics belonging to the "Task" + * group could for example include the task attempt number (more fine grained identification), or + * exclude it (for continuity of the namespace across failure and recovery). */ @Internal public abstract class ComponentMetricGroup extends AbstractMetricGroup { - private final ComponentMetricGroup parent; - - private final String format; - - // Map: scope variable -> specific value - protected final Map<String, String> formats; - - // ------------------------------------------------------------------------ - /** * Creates a new ComponentMetricGroup. * - * @param registry registry to register new metrics with - * @param parentGroup parent group, may be null - * @param scopeFormat default format string + * @param registry registry to register new metrics with + * @param scope the scope of the group */ - public ComponentMetricGroup(MetricRegistry registry, ComponentMetricGroup parentGroup, String scopeFormat) { - super(registry); - this.formats = new HashMap<>(); - this.parent = parentGroup; - this.format = scopeFormat; + public ComponentMetricGroup( + MetricRegistry registry, + String[] scope) { + + super(registry, scope); } /** @@ -87,71 +74,5 @@ public abstract class ComponentMetricGroup extends AbstractMetricGroup { // sub components // ------------------------------------------------------------------------ - protected ComponentMetricGroup parent() { - return parent; - } - protected abstract Iterable<? extends ComponentMetricGroup> subComponents(); - - // ------------------------------------------------------------------------ - // scope format - // ------------------------------------------------------------------------ - - protected abstract String getScopeFormat(Scope.ScopeFormat format); - - @Override - public List<String> generateScope() { - return generateScope(format); - } - - @Override - public List<String> generateScope(Scope.ScopeFormat format) { - return generateScope(getScopeFormat(format)); - } - - private List<String> generateScope(String format) { - String[] components = Scope.split(format); - - List<String> scope = new ArrayList<>(); - if (components[0].equals(SCOPE_WILDCARD)) { - if (this.parent != null) { - scope = this.parent.generateScope(); - } - this.replaceFormats(components); - addToList(scope, components, 1); - } else { - if (this.parent != null) { - this.parent.replaceFormats(components); - } - this.replaceFormats(components); - addToList(scope, components, 0); - } - return scope; - } - - private void replaceFormats(String[] components) { - if (this.parent != null) { - this.parent.replaceFormats(components); - } - for (int x = 0; x < components.length; x++) { - if (components[x].startsWith("<")) { - if (this.formats.containsKey(components[x])) { - components[x] = this.formats.get(components[x]); - } - } - } - } - - /** - * Adds all elements from the given array, starting from the given index, to the given list. - * - * @param list destination - * @param array source - * @param startIndex array index to start from - */ - private static void addToList(List<String> list, String[] array, int startIndex) { - for (int x = startIndex; x < array.length; x++) { - list.add(array[x]); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java index eedb0fa..ddcd73b 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java @@ -15,44 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.MetricRegistry; -import java.util.List; - /** - * A simple named {@link org.apache.flink.metrics.MetricGroup} with no special properties. + * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold + * subgroups of metrics. */ @Internal public class GenericMetricGroup extends AbstractMetricGroup { - - private final AbstractMetricGroup parent; - - private final String name; - - protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, int name) { - this(registry, parent, String.valueOf(name)); - } - - protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { - super(registry); - this.parent = parent; - this.name = name; - } - @Override - public List<String> generateScope() { - List<String> scope = parent.generateScope(); - scope.add(name); - return scope; + public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { + super(registry, makeScopeComponents(parent, name)); } - @Override - public List<String> generateScope(Scope.ScopeFormat format) { - List<String> scope = parent.generateScope(format); - scope.add(name); - return scope; + // ------------------------------------------------------------------------ + + private static String[] makeScopeComponents(AbstractMetricGroup parent, String name) { + if (parent != null) { + String[] parentComponents = parent.getScopeComponents(); + if (parentComponents != null && parentComponents.length > 0) { + String[] parts = new String[parentComponents.length + 1]; + System.arraycopy(parentComponents, 0, parts, 0, parentComponents.length); + parts[parts.length - 1] = name; + return parts; + } + } + return new String[] { name }; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java index dea5650..b34c844 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -15,19 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricRegistry; -import java.util.List; - /** * Special {@link org.apache.flink.metrics.MetricGroup} that contains shareable pre-defined IO-related metrics. */ public class IOMetricGroup extends AbstractMetricGroup { - - private final TaskMetricGroup parent; private final Counter numBytesIn; private final Counter numBytesOut; @@ -35,37 +32,27 @@ public class IOMetricGroup extends AbstractMetricGroup { private final Counter numRecordsOut; public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) { - super(registry); - this.parent = parent; + super(registry, parent.getScopeComponents()); + this.numBytesIn = parent.counter("numBytesIn"); this.numBytesOut = parent.counter("numBytesOut"); this.numRecordsIn = parent.counter("numRecordsIn"); this.numRecordsOut = parent.counter("numRecordsOut"); } - @Override - public List<String> generateScope() { - return parent.generateScope(); - } - - @Override - public List<String> generateScope(Scope.ScopeFormat format) { - return parent.generateScope(format); - } - public Counter getBytesInCounter() { - return this.numBytesIn; + return numBytesIn; } public Counter getBytesOutCounter() { - return this.numBytesOut; + return numBytesOut; } public Counter getRecordsInCounter() { - return this.numRecordsIn; + return numRecordsIn; } public Counter getRecordsOutCounter() { - return this.numRecordsOut; + return numRecordsOut; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java index f4f634a..f816278 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java @@ -21,57 +21,92 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; import org.apache.flink.util.AbstractID; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Special {@link org.apache.flink.metrics.MetricGroup} representing a Job. + * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job, running on the TaskManager. * - * <p>Contains extra logic for adding tasks. + * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}). */ @Internal public class JobMetricGroup extends ComponentMetricGroup { - public static final String SCOPE_JOB_DESCRIPTOR = "job"; - public static final String SCOPE_JOB_ID = Scope.format("job_id"); - public static final String SCOPE_JOB_NAME = Scope.format("job_name"); - public static final String DEFAULT_SCOPE_JOB_COMPONENT = Scope.concat(SCOPE_JOB_NAME); - public static final String DEFAULT_SCOPE_JOB = Scope.concat(DEFAULT_SCOPE_TM, DEFAULT_SCOPE_JOB_COMPONENT); - - // ------------------------------------------------------------------------ + /** The metrics group that contains this group */ + private final TaskManagerMetricGroup parent; /** Map from execution attempt ID (task identifier) to task metrics */ private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); - + + /** The ID of the job represented by this metrics group */ private final JobID jobId; + /** The name of the job represented by this metrics group */ + @Nullable + private final String jobName; + // ------------------------------------------------------------------------ - public JobMetricGroup(MetricRegistry registry, TaskManagerMetricGroup taskManager, JobID jobId, String jobName) { - super(registry, taskManager, registry.getScopeConfig().getJobFormat()); + public JobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName); + } + + public JobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + TaskManagerJobScopeFormat scopeFormat, + JobID jobId, + @Nullable String jobName) { + + super(registry, scopeFormat.formatScope(parent, jobId, jobName)); + + this.parent = checkNotNull(parent); this.jobId = checkNotNull(jobId); - this.formats.put(SCOPE_JOB_ID, jobId.toString()); - this.formats.put(SCOPE_JOB_NAME, checkNotNull(jobName)); + this.jobName = jobName; + } + + public final TaskManagerMetricGroup parent() { + return parent; + } + + public JobID jobId() { + return jobId; + } + + @Nullable + public String jobName() { + return jobName; } // ------------------------------------------------------------------------ // adding / removing tasks // ------------------------------------------------------------------------ - public TaskMetricGroup addTask(AbstractID vertexId, AbstractID executionId, int subtaskIndex, String name) { - checkNotNull(vertexId); + public TaskMetricGroup addTask( + AbstractID vertexId, + AbstractID executionId, + String taskName, + int subtaskIndex, + int attemptNumber) { + checkNotNull(executionId); - checkNotNull(name); synchronized (this) { if (!isClosed()) { - TaskMetricGroup task = new TaskMetricGroup(registry, this, vertexId, executionId, subtaskIndex, name); + TaskMetricGroup task = new TaskMetricGroup(registry, this, + vertexId, executionId, taskName, subtaskIndex, attemptNumber); tasks.put(executionId, task); return task; } else { @@ -82,7 +117,7 @@ public class JobMetricGroup extends ComponentMetricGroup { public void removeTaskMetricGroup(AbstractID executionId) { checkNotNull(executionId); - + boolean removeFromParent = false; synchronized (this) { if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) { @@ -91,25 +126,16 @@ public class JobMetricGroup extends ComponentMetricGroup { close(); } } - + // IMPORTANT: removing from the parent must happen while holding the this group's lock, // because it would violate the "first parent then subgroup" lock acquisition order if (removeFromParent) { - ((TaskManagerMetricGroup) parent()).removeJobMetricsGroup(jobId, this); + parent.removeJobMetricsGroup(jobId, this); } } - // ------------------------------------------------------------------------ - // component group behavior - // ------------------------------------------------------------------------ - @Override protected Iterable<? extends ComponentMetricGroup> subComponents() { return tasks.values(); } - - @Override - protected String getScopeFormat(Scope.ScopeFormat format) { - return format.getJobFormat(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java deleted file mode 100644 index 1bfcfe3..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/NonRegisteringMetricsGroup.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.MetricGroup; - -/** - * Metrics group that does not register any metrics. - */ -@Internal -public class NonRegisteringMetricsGroup implements MetricGroup { - - // ------------------------------------------------------------------------ - // singleton - // ------------------------------------------------------------------------ - - private static final NonRegisteringMetricsGroup INSTANCE = new NonRegisteringMetricsGroup(); - - public static NonRegisteringMetricsGroup get() { - return INSTANCE; - } - - /** Private constructor to prevent instantiation */ - private NonRegisteringMetricsGroup() {} - - // ------------------------------------------------------------------------ - // metrics group - // ------------------------------------------------------------------------ - - @Override - public void close() {} - - @Override - public boolean isClosed() { - return false; - } - - @Override - public Counter counter(int name) { - return new Counter(); - } - - @Override - public Counter counter(String name) { - return new Counter(); - } - - @Override - public <T> Gauge<T> gauge(int name, Gauge<T> gauge) { - return gauge; - } - - @Override - public <T> Gauge<T> gauge(String name, Gauge<T> gauge) { - return gauge; - } - - - @Override - public MetricGroup addGroup(int name) { - return addGroup(String.valueOf(name)); - } - - @Override - public MetricGroup addGroup(String name) { - return new NonRegisteringMetricsGroup(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java index 390b55b..6db79ab 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java @@ -15,14 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat; import java.util.Collections; -import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator. @@ -30,26 +32,29 @@ import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; @Internal public class OperatorMetricGroup extends ComponentMetricGroup { - public static final String SCOPE_OPERATOR_DESCRIPTOR = "operator"; - public static final String SCOPE_OPERATOR_NAME = Scope.format("operator_name"); - public static final String SCOPE_OPERATOR_SUBTASK_INDEX = Scope.format("subtask_index"); - public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = Scope.concat(SCOPE_OPERATOR_NAME, SCOPE_OPERATOR_SUBTASK_INDEX); - public static final String DEFAULT_SCOPE_OPERATOR = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_OPERATOR_COMPONENT); - - protected OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup task, String name, int subTaskIndex) { - super(registry, task, registry.getScopeConfig().getOperatorFormat()); + /** The task metric group that contains this operator metric groups */ + private final TaskMetricGroup parent; - this.formats.put(SCOPE_OPERATOR_NAME, name); - this.formats.put(SCOPE_OPERATOR_SUBTASK_INDEX, String.valueOf(subTaskIndex)); + public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) { + this(registry, parent, registry.getScopeFormats().getOperatorFormat(), operatorName); } - // ------------------------------------------------------------------------ + public OperatorMetricGroup( + MetricRegistry registry, + TaskMetricGroup parent, + OperatorScopeFormat scopeFormat, + String operatorName) { - @Override - protected String getScopeFormat(Scope.ScopeFormat format) { - return format.getOperatorFormat(); + super(registry, scopeFormat.formatScope(parent, operatorName)); + this.parent = checkNotNull(parent); } + // ------------------------------------------------------------------------ + + public final TaskMetricGroup parent() { + return parent; + } + @Override protected Iterable<? extends ComponentMetricGroup> subComponents() { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java deleted file mode 100644 index 83013e2..0000000 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.groups; - -import org.apache.flink.annotation.Internal; - -/** - * This class provides utility-functions for handling scopes. - */ -@Internal -public class Scope { - public static final String SCOPE_WILDCARD = "*"; - - private static final String SCOPE_PREFIX = "<"; - private static final String SCOPE_SUFFIX = ">"; - private static final String SCOPE_SPLIT = "."; - - private Scope() { - } - - /** - * Modifies the given string to resemble a scope variable. - * - * @param scope string to format - * @return formatted string - */ - public static String format(String scope) { - return SCOPE_PREFIX + scope + SCOPE_SUFFIX; - } - - /** - * Joins the given components into a single scope. - * - * @param components components to join - * @return joined scoped - */ - public static String concat(String... components) { - StringBuilder sb = new StringBuilder(); - sb.append(components[0]); - for (int x = 1; x < components.length; x++) { - sb.append(SCOPE_SPLIT); - sb.append(components[x]); - } - return sb.toString(); - } - - /** - * Splits the given scope into it's individual components. - * - * @param scope scope to split - * @return array of components - */ - public static String[] split(String scope) { - return scope.split("\\" + SCOPE_SPLIT); - } - - /** - * Simple container for component scope format strings. - */ - public static class ScopeFormat { - - private String operatorFormat = OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR; - private String taskFormat = TaskMetricGroup.DEFAULT_SCOPE_TASK; - private String jobFormat = JobMetricGroup.DEFAULT_SCOPE_JOB; - private String taskManagerFormat = TaskManagerMetricGroup.DEFAULT_SCOPE_TM; - - - public ScopeFormat setOperatorFormat(String format) { - this.operatorFormat = format; - return this; - } - - public ScopeFormat setTaskFormat(String format) { - this.taskFormat = format; - return this; - } - - public ScopeFormat setJobFormat(String format) { - this.jobFormat = format; - return this; - } - - public ScopeFormat setTaskManagerFormat(String format) { - this.taskManagerFormat = format; - return this; - } - - public String getOperatorFormat() { - return this.operatorFormat; - } - - public String getTaskFormat() { - return this.taskFormat; - } - - public String getJobFormat() { - return this.jobFormat; - } - - public String getTaskManagerFormat() { - return this.taskManagerFormat; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java index bfb9362..3cb3936 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java @@ -21,13 +21,12 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; import org.apache.flink.util.AbstractID; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager. * @@ -37,22 +36,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class TaskManagerMetricGroup extends ComponentMetricGroup { - public static final String SCOPE_HOST_DESCRIPTOR = "host"; - public static final String SCOPE_TM_DESCRIPTOR = "taskmanager"; - public static final String SCOPE_TM_HOST = Scope.format("host"); - public static final String SCOPE_TM_ID = Scope.format("tm_id"); - public static final String DEFAULT_SCOPE_TM_COMPONENT = Scope.concat(SCOPE_TM_HOST, "taskmanager", SCOPE_TM_ID); - public static final String DEFAULT_SCOPE_TM = DEFAULT_SCOPE_TM_COMPONENT; - - // ------------------------------------------------------------------------ - private final Map<JobID, JobMetricGroup> jobs = new HashMap<>(); - public TaskManagerMetricGroup(MetricRegistry registry, String host, String taskManagerId) { - super(registry, null, registry.getScopeConfig().getTaskManagerFormat()); + private final String hostname; + + private final String taskManagerId; - this.formats.put(SCOPE_TM_HOST, checkNotNull(host)); - this.formats.put(SCOPE_TM_ID, checkNotNull(taskManagerId)); + + public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId) { + this(registry, registry.getScopeFormats().getTaskManagerFormat(), hostname, taskManagerId); + } + + public TaskManagerMetricGroup( + MetricRegistry registry, + TaskManagerScopeFormat scopeFormat, + String hostname, String taskManagerId) { + + super(registry, scopeFormat.formatScope(hostname, taskManagerId)); + this.hostname = hostname; + this.taskManagerId = taskManagerId; + } + + public String hostname() { + return hostname; + } + + public String taskManagerId() { + return taskManagerId; } // ------------------------------------------------------------------------ @@ -64,9 +74,10 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { String jobName, AbstractID vertexID, AbstractID executionId, + String taskName, int subtaskIndex, - String taskName) { - + int attemptNumber) { + // we cannot strictly lock both our map modification and the job group modification // because it might lead to a deadlock while (true) { @@ -80,28 +91,30 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { jobs.put(jobId, currentJobGroup); } } - + // try to add another task. this may fail if we found a pre-existing job metrics // group and it is closed concurrently - TaskMetricGroup taskGroup = currentJobGroup.addTask(vertexID, executionId, subtaskIndex, taskName); + TaskMetricGroup taskGroup = currentJobGroup.addTask( + vertexID, executionId, taskName, subtaskIndex, attemptNumber); + if (taskGroup != null) { // successfully added the next task return taskGroup; } - + // else fall through the loop } } - + public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) { if (jobId == null || group == null || !group.isClosed()) { return; } - + synchronized (this) { // optimistically remove the currently contained group, and check later if it was correct JobMetricGroup containedGroup = jobs.remove(jobId); - + // check if another group was actually contained, and restore that one if (containedGroup != null && containedGroup != group) { jobs.put(jobId, containedGroup); @@ -113,15 +126,6 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { return jobs.size(); } - // ------------------------------------------------------------------------ - // component group behavior - // ------------------------------------------------------------------------ - - @Override - protected String getScopeFormat(Scope.ScopeFormat format) { - return format.getTaskManagerFormat(); - } - @Override protected Iterable<? extends ComponentMetricGroup> subComponents() { return jobs.values(); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java index 316c84f..784578b 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java @@ -20,12 +20,13 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; import org.apache.flink.util.AbstractID; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -35,47 +36,109 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class TaskMetricGroup extends ComponentMetricGroup { - - public static final String SCOPE_TASK_DESCRIPTOR = "task"; - public static final String SCOPE_TASK_ID = Scope.format("task_id"); - public static final String SCOPE_TASK_NAME = Scope.format("task_name"); - public static final String SCOPE_TASK_ATTEMPT = Scope.format("task_attempt"); - public static final String SCOPE_TASK_SUBTASK_INDEX = Scope.format("subtask_index"); - public static final String DEFAULT_SCOPE_TASK_COMPONENT = SCOPE_TASK_NAME; - public static final String DEFAULT_SCOPE_TASK = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_TASK_COMPONENT); + /** The job metrics group containing this task metrics group */ + private final JobMetricGroup parent; private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); private final IOMetricGroup ioMetrics; - + + /** The execution Id uniquely identifying the executed task represented by this metrics group */ private final AbstractID executionId; + + @Nullable + private final AbstractID vertexId; + @Nullable + private final String taskName; + private final int subtaskIndex; - protected TaskMetricGroup( + private final int attemptNumber; + + // ------------------------------------------------------------------------ + + public TaskMetricGroup( MetricRegistry registry, JobMetricGroup parent, - AbstractID taskId, + @Nullable AbstractID vertexId, AbstractID executionId, + @Nullable String taskName, int subtaskIndex, - String name) { + int attemptNumber) { + + this(registry, parent, registry.getScopeFormats().getTaskFormat(), + vertexId, executionId, taskName, subtaskIndex, attemptNumber); + } - super(registry, parent, registry.getScopeConfig().getTaskFormat()); + public TaskMetricGroup( + MetricRegistry registry, + JobMetricGroup parent, + TaskScopeFormat scopeFormat, + @Nullable AbstractID vertexId, + AbstractID executionId, + @Nullable String taskName, + int subtaskIndex, + int attemptNumber) { - this.executionId = executionId; + super(registry, scopeFormat.formatScope( + parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber)); + + this.parent = checkNotNull(parent); + this.executionId = checkNotNull(executionId); + this.vertexId = vertexId; + this.taskName = taskName; this.subtaskIndex = subtaskIndex; - - this.formats.put(SCOPE_TASK_ID, taskId.toString()); - this.formats.put(SCOPE_TASK_ATTEMPT, executionId.toString()); - this.formats.put(SCOPE_TASK_NAME, checkNotNull(name)); - this.formats.put(SCOPE_TASK_SUBTASK_INDEX, String.valueOf(subtaskIndex)); + this.attemptNumber = attemptNumber; this.ioMetrics = new IOMetricGroup(registry, this); } + // ------------------------------------------------------------------------ + // properties + // ------------------------------------------------------------------------ + + public final JobMetricGroup parent() { + return parent; + } + + public AbstractID executionId() { + return executionId; + } + + @Nullable + public AbstractID vertexId() { + return vertexId; + } + + @Nullable + public String taskName() { + return taskName; + } + + public int subtaskIndex() { + return subtaskIndex; + } + + public int attemptNumber() { + return attemptNumber; + } + + /** + * Returns the IOMetricGroup for this task. + * + * @return IOMetricGroup for this task. + */ + public IOMetricGroup getIOMetricGroup() { + return ioMetrics; + } + + // ------------------------------------------------------------------------ + // operators and cleanup + // ------------------------------------------------------------------------ public OperatorMetricGroup addOperator(String name) { - OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name, this.subtaskIndex); + OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name); synchronized (this) { OperatorMetricGroup previous = operators.put(name, operator); @@ -93,27 +156,10 @@ public class TaskMetricGroup extends ComponentMetricGroup { @Override public void close() { super.close(); - parent().removeTaskMetricGroup(executionId); - } - - /** - * Returns the IOMetricGroup for this task. - * - * @return IOMetricGroup for this task. - */ - public IOMetricGroup getIOMetricGroup() { - return this.ioMetrics; - } - - @Override - protected JobMetricGroup parent() { - return (JobMetricGroup) super.parent(); + parent.removeTaskMetricGroup(executionId); } - @Override - protected String getScopeFormat(Scope.ScopeFormat format) { - return format.getTaskFormat(); - } + // ------------------------------------------------------------------------ @Override protected Iterable<? extends ComponentMetricGroup> subComponents() { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java new file mode 100644 index 0000000..961bcce --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +/** + * A special {@link MetricGroup} that does not register any metrics at the metrics registry + * and any reporters. + * + * <p>This metrics group appears always closed ({@link #isClosed()}). + */ +@Internal +public class UnregisteredMetricsGroup implements MetricGroup { + + @Override + public void close() {} + + @Override + public boolean isClosed() { + return true; + } + + @Override + public Counter counter(int name) { + return new Counter(); + } + + @Override + public Counter counter(String name) { + return new Counter(); + } + + @Override + public <T> Gauge<T> gauge(int name, Gauge<T> gauge) { + return gauge; + } + + @Override + public <T> Gauge<T> gauge(String name, Gauge<T> gauge) { + return gauge; + } + + + @Override + public MetricGroup addGroup(int name) { + return addGroup(String.valueOf(name)); + } + + @Override + public MetricGroup addGroup(String name) { + return new UnregisteredMetricsGroup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java new file mode 100644 index 0000000..9637f65 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups.scope; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.util.AbstractID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents the format after which the "scope" (or namespace) of the various + * component metric groups is built. Component metric groups + * ({@link org.apache.flink.metrics.groups.ComponentMetricGroup}), are for example + * "TaskManager", "Task", or "Operator". + * + * <p>User defined scope formats allow users to include or exclude + * certain identifiers from the scope. The scope for metrics belonging to the "Task" + * group could for example include the task attempt number (more fine grained identification), or + * exclude it (continuity of the namespace across failure and recovery). + */ +public abstract class ScopeFormat { + + // ------------------------------------------------------------------------ + // Scope Format Special Characters + // ------------------------------------------------------------------------ + + /** + * If the scope format starts with this character, then the parent components scope + * format will be used as a prefix. + * + * <p>For example, if the {@link JobMetricGroup} format is {@code "*.<job_name>"}, and the + * {@link TaskManagerMetricGroup} format is {@code "<host>"}, then the job's metrics + * will have {@code "<host>.<job_name>"} as their scope. + */ + public static final String SCOPE_INHERIT_PARENT = "*"; + + public static final String SCOPE_SEPARATOR = "."; + + private static final String SCOPE_VARIABLE_PREFIX = "<"; + private static final String SCOPE_VARIABLE_SUFFIX = ">"; + + // ------------------------------------------------------------------------ + // Scope Variables + // ------------------------------------------------------------------------ + + // ----- Task Manager ---- + + public static final String SCOPE_TASKMANAGER_HOST = asVariable("host"); + public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); + + /** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT = + concat(SCOPE_TASKMANAGER_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); + + /** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT; + + // ----- Job on Task Manager ---- + + public static final String SCOPE_JOB_ID = asVariable("job_id"); + public static final String SCOPE_JOB_NAME = asVariable("job_name"); + + /** The default scope format for the job component: {@code "<job_name>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = SCOPE_JOB_NAME; + + /** The default scope format for all job metrics: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT); + + // ----- Task ---- + + public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id"); + public static final String SCOPE_TASK_NAME = asVariable("task_name"); + public static final String SCOPE_TASK_ATTEMPT_ID = asVariable("task_attempt_id"); + public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num"); + public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index"); + + /** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_TASK_COMPONENT = + concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX); + + /** The default scope format for all task metrics: + * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_TASK_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT); + + // ----- Operator ---- + + public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name"); + + /** The default scope added by the operator component: "<operator_name>.<subtask_index>" */ + public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = + concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX); + + /** The default scope format for all operator metrics: + * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_OPERATOR_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT); + + // ------------------------------------------------------------------------ + // Formatters form the individual component types + // ------------------------------------------------------------------------ + + /** + * The scope format for the {@link TaskManagerMetricGroup}. + */ + public static class TaskManagerScopeFormat extends ScopeFormat { + + public TaskManagerScopeFormat(String format) { + super(format, null, new String[] { + SCOPE_TASKMANAGER_HOST, + SCOPE_TASKMANAGER_ID + }); + } + + public String[] formatScope(String hostname, String taskManagerId) { + final String[] template = copyTemplate(); + final String[] values = { hostname, taskManagerId }; + return bindVariables(template, values); + } + } + + // ------------------------------------------------------------------------ + + /** + * The scope format for the {@link JobMetricGroup}. + */ + public static class TaskManagerJobScopeFormat extends ScopeFormat { + + public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_TASKMANAGER_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME + }); + } + + public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) { + final String[] template = copyTemplate(); + final String[] values = { + parent.hostname(), + parent.taskManagerId(), + valueOrNull(jid), + valueOrNull(jobName) + }; + return bindVariables(template, values); + } + } + + // ------------------------------------------------------------------------ + + /** + * The scope format for the {@link TaskMetricGroup}. + */ + public static class TaskScopeFormat extends ScopeFormat { + + public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_TASKMANAGER_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME, + SCOPE_TASK_VERTEX_ID, + SCOPE_TASK_ATTEMPT_ID, + SCOPE_TASK_NAME, + SCOPE_TASK_SUBTASK_INDEX, + SCOPE_TASK_ATTEMPT_NUM + }); + } + + public String[] formatScope( + JobMetricGroup parent, + AbstractID vertexId, AbstractID attemptId, + String taskName, int subtask, int attemptNumber) { + + final String[] template = copyTemplate(); + final String[] values = { + parent.parent().hostname(), + parent.parent().taskManagerId(), + valueOrNull(parent.jobId()), + valueOrNull(parent.jobName()), + valueOrNull(vertexId), + valueOrNull(attemptId), + valueOrNull(taskName), + String.valueOf(subtask), + String.valueOf(attemptNumber) + }; + return bindVariables(template, values); + } + } + + // ------------------------------------------------------------------------ + + /** + * The scope format for the {@link org.apache.flink.metrics.groups.OperatorMetricGroup}. + */ + public static class OperatorScopeFormat extends ScopeFormat { + + public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_TASKMANAGER_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME, + SCOPE_TASK_VERTEX_ID, + SCOPE_TASK_ATTEMPT_ID, + SCOPE_TASK_NAME, + SCOPE_TASK_SUBTASK_INDEX, + SCOPE_TASK_ATTEMPT_NUM, + SCOPE_OPERATOR_NAME + }); + } + + public String[] formatScope(TaskMetricGroup parent, String operatorName) { + + final String[] template = copyTemplate(); + final String[] values = { + parent.parent().parent().hostname(), + parent.parent().parent().taskManagerId(), + valueOrNull(parent.parent().jobId()), + valueOrNull(parent.parent().jobName()), + valueOrNull(parent.vertexId()), + valueOrNull(parent.executionId()), + valueOrNull(parent.taskName()), + String.valueOf(parent.subtaskIndex()), + String.valueOf(parent.attemptNumber()), + valueOrNull(operatorName) + }; + return bindVariables(template, values); + } + } + + // ------------------------------------------------------------------------ + // Scope Format Base + // ------------------------------------------------------------------------ + + /** The scope format */ + private final String format; + + /** The format, split into components */ + private final String[] template; + + private final int[] templatePos; + + private final int[] valuePos; + + // ------------------------------------------------------------------------ + + protected ScopeFormat(String format, ScopeFormat parent, String[] variables) { + checkNotNull(format, "format is null"); + + final String[] rawComponents = format.split("\\" + SCOPE_SEPARATOR); + + // compute the template array + final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT); + if (parentAsPrefix) { + if (parent == null) { + throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '" + + SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component)."); + } + + this.format = format.length() > 2 ? format.substring(2) : "<empty>"; + + String[] parentTemplate = parent.template; + int parentLen = parentTemplate.length; + + this.template = new String[parentLen + rawComponents.length - 1]; + System.arraycopy(parentTemplate, 0, this.template, 0, parentLen); + System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1); + } + else { + this.format = format.isEmpty() ? "<empty>" : format; + this.template = rawComponents; + } + + // --- compute the replacement matrix --- + // a bit of clumsy Java collections code ;-) + + HashMap<String, Integer> varToValuePos = arrayToMap(variables); + List<Integer> templatePos = new ArrayList<>(); + List<Integer> valuePos = new ArrayList<>(); + + for (int i = 0; i < template.length; i++) { + final String component = template[i]; + + // check if that is a variable + if (component != null && component.length() >= 3 && + component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') { + + // this is a variable + Integer replacementPos = varToValuePos.get(component); + if (replacementPos != null) { + templatePos.add(i); + valuePos.add(replacementPos); + } + } + } + + this.templatePos = integerListToArray(templatePos); + this.valuePos = integerListToArray(valuePos); + } + + // ------------------------------------------------------------------------ + + public String format() { + return format; + } + + protected final String[] copyTemplate() { + String[] copy = new String[template.length]; + System.arraycopy(template, 0, copy, 0, template.length); + return copy; + } + + protected final String[] bindVariables(String[] template, String[] values) { + final int len = templatePos.length; + for (int i = 0; i < len; i++) { + template[templatePos[i]] = values[valuePos[i]]; + } + return template; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "ScopeFormat '" + format + '\''; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Formats the given string to resemble a scope variable. + * + * @param scope The string to format + * @return The formatted string + */ + public static String asVariable(String scope) { + return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX; + } + + public static String concat(String... components) { + StringBuilder sb = new StringBuilder(); + sb.append(components[0]); + for (int x = 1; x < components.length; x++) { + sb.append(SCOPE_SEPARATOR); + sb.append(components[x]); + } + return sb.toString(); + } + + static String valueOrNull(Object value) { + return (value == null || (value instanceof String && ((String) value).isEmpty())) ? + "null" : value.toString(); + } + + static HashMap<String, Integer> arrayToMap(String[] array) { + HashMap<String, Integer> map = new HashMap<>(array.length); + for (int i = 0; i < array.length; i++) { + map.put(array[i], i); + } + return map; + } + + private static int[] integerListToArray(List<Integer> list) { + int[] array = new int[list.size()]; + int pos = 0; + for (Integer i : list) { + array[pos++] = i; + } + return array; + } +}