[FLINK-1502] [core] Add basic metric system
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/003ce18e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/003ce18e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/003ce18e Branch: refs/heads/master Commit: 003ce18efc0249fae874e56c3df6acf19f5f2429 Parents: 8ed3685 Author: zentol <[email protected]> Authored: Mon May 9 14:54:32 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Sun May 22 19:58:17 2016 +0200 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 7 + flink-contrib/flink-storm/pom.xml | 8 + .../flink/storm/wrappers/BoltWrapperTest.java | 7 + flink-core/pom.xml | 6 + .../api/common/functions/RuntimeContext.java | 8 + .../util/AbstractRuntimeUDFContext.java | 12 +- .../functions/util/RuntimeUDFContext.java | 6 +- .../common/operators/CollectionExecutor.java | 48 +++- .../java/org/apache/flink/metrics/Counter.java | 69 ++++++ .../java/org/apache/flink/metrics/Gauge.java | 33 +++ .../java/org/apache/flink/metrics/Metric.java | 27 +++ .../org/apache/flink/metrics/MetricGroup.java | 99 +++++++++ .../apache/flink/metrics/MetricRegistry.java | 170 +++++++++++++++ .../metrics/groups/AbstractMetricGroup.java | 145 +++++++++++++ .../metrics/groups/ComponentMetricGroup.java | 113 ++++++++++ .../metrics/groups/GenericMetricGroup.java | 57 +++++ .../flink/metrics/groups/IOMetricGroup.java | 70 ++++++ .../flink/metrics/groups/JobMetricGroup.java | 71 ++++++ .../metrics/groups/OperatorMetricGroup.java | 46 ++++ .../org/apache/flink/metrics/groups/Scope.java | 120 ++++++++++ .../metrics/groups/TaskManagerMetricGroup.java | 70 ++++++ .../flink/metrics/groups/TaskMetricGroup.java | 87 ++++++++ .../metrics/reporter/AbstractReporter.java | 48 ++++ .../flink/metrics/reporter/JMXReporter.java | 167 ++++++++++++++ .../flink/metrics/reporter/MetricReporter.java | 73 +++++++ .../flink/metrics/reporter/Scheduled.java | 32 +++ .../functions/util/RuntimeUDFContextTest.java | 11 +- .../api/common/io/RichInputFormatTest.java | 3 +- .../api/common/io/RichOutputFormatTest.java | 3 +- .../operators/GenericDataSinkBaseTest.java | 5 +- .../operators/GenericDataSourceBaseTest.java | 5 +- .../base/FlatMapOperatorCollectionTest.java | 3 +- .../base/InnerJoinOperatorBaseTest.java | 5 +- .../common/operators/base/MapOperatorTest.java | 5 +- .../base/PartitionMapOperatorTest.java | 5 +- .../apache/flink/metrics/MetricGroupTest.java | 93 ++++++++ .../flink/metrics/MetricRegistryTest.java | 217 +++++++++++++++++++ .../flink/metrics/groups/JobGroupTest.java | 71 ++++++ .../flink/metrics/groups/OperatorGroupTest.java | 86 ++++++++ .../flink/metrics/groups/TaskGroupTest.java | 80 +++++++ .../metrics/groups/TaskManagerGroupTest.java | 70 ++++++ .../flink/metrics/reporter/JMXReporterTest.java | 43 ++++ .../flink/metrics/util/DummyJobMetricGroup.java | 50 +++++ .../flink/metrics/util/DummyMetricGroup.java | 57 +++++ .../flink/metrics/util/DummyMetricRegistry.java | 34 +++ .../metrics/util/DummyOperatorMetricGroup.java | 43 ++++ .../flink/metrics/util/DummyReporter.java | 47 ++++ .../util/DummyTaskManagerMetricGroup.java | 48 ++++ .../metrics/util/DummyTaskMetricGroup.java | 48 ++++ .../apache/flink/metrics/util/TestReporter.java | 40 ++++ flink-dist/src/main/flink-bin/bin/config.sh | 6 + .../src/main/flink-bin/bin/flink-daemon.sh | 18 +- .../base/CoGroupOperatorCollectionTest.java | 3 +- .../operators/base/GroupReduceOperatorTest.java | 5 +- .../base/InnerJoinOperatorBaseTest.java | 5 +- .../operators/base/ReduceOperatorTest.java | 5 +- flink-metrics/flink-metrics-dropwizard/pom.xml | 72 ++++++ .../dropwizard/ScheduledDropwizardReporter.java | 99 +++++++++ .../dropwizard/metrics/CounterWrapper.java | 33 +++ .../flink/dropwizard/metrics/GaugeWrapper.java | 33 +++ flink-metrics/flink-metrics-ganglia/pom.xml | 90 ++++++++ .../flink/metrics/graphite/GangliaReporter.java | 73 +++++++ flink-metrics/flink-metrics-graphite/pom.xml | 84 +++++++ .../metrics/graphite/GraphiteReporter.java | 70 ++++++ flink-metrics/flink-metrics-statsd/pom.xml | 43 ++++ .../flink/metrics/statsd/StatsDReporter.java | 132 +++++++++++ flink-metrics/pom.xml | 42 ++++ flink-runtime/pom.xml | 4 - .../deployment/TaskDeploymentDescriptor.java | 7 + .../flink/runtime/execution/Environment.java | 8 + .../runtime/executiongraph/ExecutionVertex.java | 1 + .../api/reader/AbstractRecordReader.java | 8 + .../io/network/api/reader/BufferReader.java | 5 + .../io/network/api/reader/ReaderBase.java | 8 + .../AdaptiveSpanningRecordDeserializer.java | 20 ++ .../api/serialization/RecordDeserializer.java | 8 + .../api/serialization/RecordSerializer.java | 8 + .../serialization/SpanningRecordSerializer.java | 19 ++ ...llingAdaptiveSpanningRecordDeserializer.java | 21 ++ .../io/network/api/writer/RecordWriter.java | 11 + .../iterative/task/AbstractIterativeTask.java | 9 +- .../flink/runtime/operators/BatchTask.java | 19 +- .../flink/runtime/operators/DataSinkTask.java | 5 +- .../flink/runtime/operators/DataSourceTask.java | 5 +- .../runtime/operators/GroupReduceDriver.java | 2 +- .../flink/runtime/operators/TaskContext.java | 3 + .../operators/chaining/ChainedDriver.java | 8 +- .../util/DistributedRuntimeUDFContext.java | 5 +- .../runtime/taskmanager/RuntimeEnvironment.java | 11 +- .../apache/flink/runtime/taskmanager/Task.java | 12 +- .../flink/runtime/taskmanager/TaskManager.scala | 169 ++++++++++++++- .../TaskDeploymentDescriptorTest.java | 3 +- .../network/api/reader/AbstractReaderTest.java | 5 + .../operators/drivers/TestTaskContext.java | 7 + .../testutils/BinaryOperatorTestBase.java | 7 + .../operators/testutils/DriverTestBase.java | 7 + .../operators/testutils/DummyEnvironment.java | 7 + .../operators/testutils/MockEnvironment.java | 7 + .../testutils/UnaryOperatorTestBase.java | 6 + .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../runtime/taskmanager/TaskManagerTest.java | 27 +-- .../flink/runtime/taskmanager/TaskStopTest.java | 3 +- .../flink/runtime/taskmanager/TaskTest.java | 6 +- .../flink/streaming/api/graph/StreamConfig.java | 9 + .../api/graph/StreamingJobGraphGenerator.java | 11 +- .../api/operators/AbstractStreamOperator.java | 9 + .../streaming/api/operators/StreamOperator.java | 3 + .../api/operators/StreamingRuntimeContext.java | 3 +- .../runtime/io/StreamInputProcessor.java | 12 + .../runtime/io/StreamTwoInputProcessor.java | 12 + .../runtime/tasks/OneInputStreamTask.java | 1 + .../streaming/runtime/tasks/OperatorChain.java | 1 + .../runtime/tasks/TwoInputStreamTask.java | 1 + ...AlignedProcessingTimeWindowOperatorTest.java | 2 + ...AlignedProcessingTimeWindowOperatorTest.java | 2 + .../runtime/tasks/StreamMockEnvironment.java | 7 + .../streaming/runtime/tasks/StreamTaskTest.java | 6 +- pom.xml | 2 + 118 files changed, 3936 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml index b966371..cccdc20 100644 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -54,6 +54,13 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.10</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-storm/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index 24b0b4b..a080a03 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -49,6 +49,14 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index e65dc45..cb9ac1c 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -32,7 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.taskmanager.RuntimeEnvironment; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; @@ -141,6 +144,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); when(taskContext.getTaskName()).thenReturn("name"); + when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); final IRichBolt bolt = mock(IRichBolt.class); @@ -225,6 +229,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); when(taskContext.getTaskName()).thenReturn("name"); + when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); @@ -289,6 +294,7 @@ public class BoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); when(taskContext.getTaskName()).thenReturn("name"); + when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup()); final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); @@ -361,6 +367,7 @@ public class BoltWrapperTest extends AbstractTest { Environment env = mock(Environment.class); when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); + when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup()); StreamTask<?, ?> mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index eb55bdd..0e0b54b 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -47,6 +47,12 @@ under the License. <!-- managed version --> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + <!-- Avro is needed for the interoperability with Avro types for serialization --> <dependency> <groupId>org.apache.avro</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 3225c00..ed2f613 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -39,6 +39,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.metrics.MetricGroup; /** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance @@ -59,6 +60,13 @@ public interface RuntimeContext { String getTaskName(); /** + * Returns the metric group for this parallel subtask. + * + * @return The metric group for this parallel subtask. + */ + MetricGroup getMetricGroup(); + + /** * Gets the parallelism with which the parallel task runs. * * @return The parallelism with which the parallel task runs. http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 74b78df..6645964 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -43,6 +43,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -61,17 +62,21 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { private final Map<String, Accumulator<?, ?>> accumulators; private final DistributedCache distributedCache; + + private final MetricGroup metrics; public AbstractRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators, - Map<String, Future<Path>> cpTasks) { + Map<String, Future<Path>> cpTasks, + MetricGroup metrics) { this.taskInfo = checkNotNull(taskInfo); this.userCodeClassLoader = userCodeClassLoader; this.executionConfig = executionConfig; this.distributedCache = new DistributedCache(checkNotNull(cpTasks)); this.accumulators = checkNotNull(accumulators); + this.metrics = metrics; } @Override @@ -93,6 +98,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { public int getIndexOfThisSubtask() { return taskInfo.getIndexOfThisSubtask(); } + + @Override + public MetricGroup getMetricGroup() { + return metrics; + } @Override public int getAttemptNumber() { http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index 2337afa..6571d0d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; /** * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators. @@ -42,8 +43,9 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>(); public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, - Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) { - super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks); + Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators, + MetricGroup metrics) { + super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index 44042c4..7e5269e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -36,6 +36,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; @@ -58,9 +59,15 @@ import org.apache.flink.api.common.operators.util.TypeComparable; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.types.Value; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.Visitor; /** @@ -86,6 +93,8 @@ public class CollectionExecutor { private final ExecutionConfig executionConfig; private int iterationSuperstep; + + private JobMetricGroup jobMetricGroup; // -------------------------------------------------------------------------------------------- @@ -106,6 +115,14 @@ public class CollectionExecutor { public JobExecutionResult execute(Plan program) throws Exception { long startTime = System.currentTimeMillis(); + + JobID jobID = program.getJobId(); + if (jobID == null) { + jobID = new JobID(); + } + this.jobMetricGroup = + new TaskManagerMetricGroup(new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString()) + .addJob(jobID, program.getJobName()); initCache(program.getCachedFiles()); Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks(); for (Operator<?> sink : sinks) { @@ -184,9 +201,12 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0); RuntimeUDFContext ctx; + + MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedSink.getName()); + if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); } else { ctx = null; } @@ -200,10 +220,13 @@ public class CollectionExecutor { GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source; // build the runtime context and compute broadcast variables, if necessary TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0); + RuntimeUDFContext ctx; + + MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, source.getName()); if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); } else { ctx = null; } @@ -225,9 +248,11 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; + + MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName()); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue()); @@ -265,9 +290,11 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; + + MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName()); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue()); @@ -523,8 +550,9 @@ public class CollectionExecutor { private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext { public IterationRuntimeUDFContext(TaskInfo taskInfo, ClassLoader classloader, ExecutionConfig executionConfig, - Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) { - super(taskInfo, classloader, executionConfig, cpTasks, accumulators); + Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators, + MetricGroup metrics) { + super(taskInfo, classloader, executionConfig, cpTasks, accumulators, metrics); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Counter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java new file mode 100644 index 0000000..b18da4f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A Counter is a {@link org.apache.flink.metrics.Metric} that measures a count. + */ +@PublicEvolving +public final class Counter implements Metric { + private long count = 0; + + /** + * Increment the current count by 1. + */ + public void inc() { + count++; + } + + /** + * Increment the current count by the given value. + * + * @param n value to increment the current count by + */ + public void inc(long n) { + count += n; + } + + /** + * Decrement the current count by 1. + */ + public void dec() { + count--; + } + + /** + * Decrement the current count by the given value. + * + * @param n value to decrement the current count by + */ + public void dec(long n) { + count -= n; + } + + /** + * Returns the current count. + * + * @return current count + */ + public long getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java new file mode 100644 index 0000000..455587a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A Gauge is a {@link org.apache.flink.metrics.Metric} that calculates a specific value at a point in time. + */ +@PublicEvolving +public abstract class Gauge<T> implements Metric { + /** + * Calculates and returns the measured value. + * + * @return calculated value + */ + public abstract T getValue(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java new file mode 100644 index 0000000..11cfcc6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/Metric.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Common interface for all metrics. + */ +@PublicEvolving +public interface Metric { +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java new file mode 100644 index 0000000..4036129 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A MetricGroup is a named container for {@link org.apache.flink.metrics.Metric}s and + * {@link org.apache.flink.metrics.MetricGroup}s. + * <p> + * Instances of this class can be used to register new metrics with Flink and to create a nested hierarchy based on the + * group names. + * <p> + * A MetricGroup is uniquely identified by it's place in the hierarchy and name. + */ +@PublicEvolving +public interface MetricGroup { + + /** + * Recursively unregisters all {@link org.apache.flink.metrics.Metric}s contained in this + * {@link org.apache.flink.metrics.MetricGroup} + */ + void close(); + + // ----------------------------------------------------------------------------------------------------------------- + // Metrics + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @return the registered counter + */ + Counter counter(int name); + + /** + * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. + * + * @param name name of the counter + * @return the registered counter + */ + Counter counter(String name); + + /** + * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. + * + * @param name name of the gauge + * @param gauge gauge to register + * @param <T> return type of the gauge + * @return the registered gauge + */ + <T> Gauge<T> gauge(int name, Gauge<T> gauge); + + /** + * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink. + * + * @param name name of the gauge + * @param gauge gauge to register + * @param <T> return type of the gauge + * @return the registered gauge + */ + <T> Gauge<T> gauge(String name, Gauge<T> gauge); + + // ----------------------------------------------------------------------------------------------------------------- + // Groups + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds it to this groups sub-groups. + * + * @param name name of the group + * @return the created group + */ + MetricGroup addGroup(int name); + + /** + * Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds it to this groups sub-groups. + * + * @param name name of the group + * @return the created group + */ + MetricGroup addGroup(String name); +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java new file mode 100644 index 0000000..0e8b0d5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.groups.Scope; +import org.apache.flink.metrics.reporter.JMXReporter; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; +import static org.apache.flink.metrics.groups.OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR; +import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM; +import static org.apache.flink.metrics.groups.TaskMetricGroup.DEFAULT_SCOPE_TASK; + +/** + * A MetricRegistry keeps track of all registered {@link org.apache.flink.metrics.Metric}s. It serves as the + * connection between {@link org.apache.flink.metrics.MetricGroup}s and {@link org.apache.flink.metrics.reporter.MetricReporter}s. + */ +@Internal +public class MetricRegistry { + private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); + + private final MetricReporter reporter; + private java.util.Timer timer; + + public static final String KEY_METRICS_REPORTER_CLASS = "metrics.reporter.class"; + public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; + public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; + + public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm"; + public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job"; + public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task"; + public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; + + private final Scope.ScopeFormat scopeConfig; + + /** + * Creates a new {@link MetricRegistry} and starts the configured reporter. + */ + public MetricRegistry(Configuration config) { + try { + String className = config.getString(KEY_METRICS_REPORTER_CLASS, null); + if (className == null) { + LOG.info("No reporter class name defined in flink-conf.yaml, defaulting to " + JMXReporter.class.getName() + "."); + className = JMXReporter.class.getName(); + } + + this.scopeConfig = createScopeConfig(config); + + Configuration reporterConfig = createReporterConfig(config); + Class<?> reporterClass = Class.forName(className); + reporter = (MetricReporter) reporterClass.newInstance(); + reporter.open(reporterConfig); + + if (reporter instanceof Scheduled) { + String[] interval = config.getString(KEY_METRICS_REPORTER_INTERVAL, "10 SECONDS").split(" "); + long millis = TimeUnit.valueOf(interval[1]).toMillis(Long.parseLong(interval[0])); + timer = new java.util.Timer(true); + timer.schedule(new TimerTask() { + @Override + public void run() { + ((Scheduled) reporter).report(); + } + }, millis, millis); + } + } catch (InstantiationException | ClassNotFoundException e) { + throw new RuntimeException("Error while instantiating reporter.", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Implementation error.", e); + } + } + + private static Configuration createReporterConfig(Configuration config) { + String[] interval = config.getString(KEY_METRICS_REPORTER_INTERVAL, "10 SECONDS").split(" "); + + String[] arguments = config.getString(KEY_METRICS_REPORTER_ARGUMENTS, "").split(" "); + + Configuration reporterConfig = new Configuration(); + reporterConfig.setString("period", interval[0]); + reporterConfig.setString("timeunit", interval[1]); + + if (arguments.length > 1) { + for (int x = 0; x < arguments.length; x += 2) { + reporterConfig.setString(arguments[x].replace("--", ""), arguments[x + 1]); + } + } + return reporterConfig; + } + + private static Scope.ScopeFormat createScopeConfig(Configuration config) { + String tmFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TM, DEFAULT_SCOPE_TM); + String jobFormat = config.getString(KEY_METRICS_SCOPE_NAMING_JOB, DEFAULT_SCOPE_JOB); + String taskFormat = config.getString(KEY_METRICS_SCOPE_NAMING_TASK, DEFAULT_SCOPE_TASK); + String operatorFormat = config.getString(KEY_METRICS_SCOPE_NAMING_OPERATOR, DEFAULT_SCOPE_OPERATOR); + + + Scope.ScopeFormat format = new Scope.ScopeFormat(); + format.setTaskManagerFormat(tmFormat); + format.setJobFormat(jobFormat); + format.setTaskFormat(taskFormat); + format.setOperatorFormat(operatorFormat); + return format; + } + + public Scope.ScopeFormat getScopeConfig() { + return this.scopeConfig; + } + + /** + * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. + */ + public void shutdown() { + if (timer != null) { + timer.cancel(); + } + if (reporter != null) { + reporter.close(); + } + } + + /** + * Registers a new {@link org.apache.flink.metrics.Metric} with this registry. + * + * @param metric metric to register + * @param name name of the metric + * @param parent group that contains the metric + */ + public void register(Metric metric, String name, AbstractMetricGroup parent) { + String metricName = reporter.generateName(name, parent.generateScope()); + + + this.reporter.notifyOfAddedMetric(metric, metricName); + } + + /** + * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry. + * + * @param metric metric to un-register + * @param name name of the metric + * @param parent group that contains the metric + */ + public void unregister(Metric metric, String name, AbstractMetricGroup parent) { + String metricName = reporter.generateName(name, parent.generateScope()); + + this.reporter.notifyOfRemovedMetric(metric, metricName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java new file mode 100644 index 0000000..373ac09 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Abstract {@link org.apache.flink.metrics.MetricGroup} that contains key functionality for adding metrics and groups. + */ +@Internal +public abstract class AbstractMetricGroup implements MetricGroup { + private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); + protected final MetricRegistry registry; + + // all metrics that are directly contained in this group + protected final Map<String, Metric> metrics = new HashMap<>(); + // all generic groups that are directly contained in this group + protected final Map<String, MetricGroup> groups = new HashMap<>(); + + public AbstractMetricGroup(MetricRegistry registry) { + this.registry = registry; + } + + @Override + public void close() { + for (MetricGroup group : groups.values()) { + group.close(); + } + this.groups.clear(); + for (Map.Entry<String, Metric> metric : metrics.entrySet()) { + registry.unregister(metric.getValue(), metric.getKey(), this); + } + this.metrics.clear(); + } + + // ----------------------------------------------------------------------------------------------------------------- + // Scope + // ----------------------------------------------------------------------------------------------------------------- + + /** + * Generates the full scope based on the default/configured format that applies to all metrics within this group. + * + * @return generated scope + */ + public abstract List<String> generateScope(); + + /** + * Generates the full scope based on the given format that applies to all metrics within this group. + * + * @param format format string + * @return generated scope + */ + public abstract List<String> generateScope(Scope.ScopeFormat format); + + // ----------------------------------------------------------------------------------------------------------------- + // Metrics + // ----------------------------------------------------------------------------------------------------------------- + + @Override + public Counter counter(int name) { + return counter("" + name); + } + + @Override + public Counter counter(String name) { + Counter counter = new Counter(); + addMetric(name, counter); + return counter; + } + + @Override + public <T> Gauge<T> gauge(int name, Gauge<T> gauge) { + return gauge("" + name, gauge); + } + + @Override + public <T> Gauge<T> gauge(String name, Gauge<T> gauge) { + addMetric(name, gauge); + return gauge; + } + + protected MetricGroup addMetric(String name, Metric metric) { + if (!name.matches("[a-zA-Z0-9]*")) { + throw new IllegalArgumentException("Metric names may not contain special characters."); + } + if (metrics.containsKey(name)) { + LOG.warn("Detected metric name collision. This group already contains a group for the given group name. " + + this.generateScope().toString() + "." + name); + } + if (groups.containsKey(name)) { + LOG.warn("Detected metric name collision. This group already contains a group for the given metric name." + + this.generateScope().toString() + ")." + name); + } + metrics.put(name, metric); + registry.register(metric, name, this); + return this; + } + + // ----------------------------------------------------------------------------------------------------------------- + // Groups + // ----------------------------------------------------------------------------------------------------------------- + + @Override + public MetricGroup addGroup(int name) { + return addGroup("" + name); + } + + @Override + public MetricGroup addGroup(String name) { + if (metrics.containsKey(name)) { + LOG.warn("Detected metric name collision. This group already contains a metric for the given group name." + + this.generateScope().toString() + "." + name); + } + if (!groups.containsKey(name)) { + groups.put(name, new GenericMetricGroup(registry, this, name)); + } + return groups.get(name); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java new file mode 100644 index 0000000..81851e2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/ComponentMetricGroup.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricRegistry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.metrics.groups.Scope.SCOPE_WILDCARD; + +/** + * Abstract {@link org.apache.flink.metrics.MetricGroup} that contains key functionality for modifying the scope. + */ +@Internal +public abstract class ComponentMetricGroup extends AbstractMetricGroup { + private final ComponentMetricGroup parent; + private final String format; + + + // Map: scope variable -> specific value + protected final Map<String, String> formats; + + /** + * Creates a new ComponentMetricGroup. + * + * @param registry registry to register new metrics with + * @param parentGroup parent group, may be null + * @param scopeFormat default format string + */ + public ComponentMetricGroup(MetricRegistry registry, ComponentMetricGroup parentGroup, String scopeFormat) { + super(registry); + this.formats = new HashMap<>(); + this.parent = parentGroup; + this.format = scopeFormat; + } + + @Override + public List<String> generateScope() { + return this.generateScope(this.format); + } + + @Override + public List<String> generateScope(Scope.ScopeFormat format) { + return generateScope(getScopeFormat(format)); + } + + protected abstract String getScopeFormat(Scope.ScopeFormat format); + + private List<String> generateScope(String format) { + String[] components = Scope.split(format); + + List<String> scope = new ArrayList<>(); + if (components[0].equals(SCOPE_WILDCARD)) { + if (this.parent != null) { + scope = this.parent.generateScope(); + } + this.replaceFormats(components); + addToList(scope, components, 1); + } else { + if (this.parent != null) { + this.parent.replaceFormats(components); + } + this.replaceFormats(components); + addToList(scope, components, 0); + } + return scope; + } + + private void replaceFormats(String[] components) { + if (this.parent != null) { + this.parent.replaceFormats(components); + } + for (int x = 0; x < components.length; x++) { + if (components[x].startsWith("<")) { + if (this.formats.containsKey(components[x])) { + components[x] = this.formats.get(components[x]); + } + } + } + } + + /** + * Adds all elements from the given array, starting from the given index, to the given list. + * + * @param list destination + * @param array source + * @param startIndex array index to start from + */ + private static void addToList(List<String> list, String[] array, int startIndex) { + for (int x = startIndex; x < array.length; x++) { + list.add(array[x]); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java new file mode 100644 index 0000000..5886312 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/GenericMetricGroup.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricRegistry; + +import java.util.List; + +/** + * A simple named {@link org.apache.flink.metrics.MetricGroup} with no special properties. + */ +@Internal +public class GenericMetricGroup extends AbstractMetricGroup { + private final AbstractMetricGroup parent; + + private final String name; + + protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, int name) { + this(registry, parent, "" + name); + } + + protected GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { + super(registry); + this.parent = parent; + this.name = name; + } + + @Override + public List<String> generateScope() { + List<String> scope = parent.generateScope(); + scope.add(name); + return scope; + } + + @Override + public List<String> generateScope(Scope.ScopeFormat format) { + List<String> scope = parent.generateScope(format); + scope.add(name); + return scope; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java new file mode 100644 index 0000000..68d91c4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/IOMetricGroup.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricRegistry; + +import java.util.List; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} that contains shareable pre-defined IO-related metrics. + */ +public class IOMetricGroup extends AbstractMetricGroup { + private final TaskMetricGroup parent; + + private transient final Counter numBytesIn; + private transient final Counter numBytesOut; + private transient final Counter numRecordsIn; + private transient final Counter numRecordsOut; + + public IOMetricGroup(MetricRegistry registry, TaskMetricGroup parent) { + super(registry); + this.parent = parent; + this.numBytesIn = parent.counter("numBytesIn"); + this.numBytesOut = parent.counter("numBytesOut"); + this.numRecordsIn = parent.counter("numRecordsIn"); + this.numRecordsOut = parent.counter("numRecordsOut"); + } + + @Override + public List<String> generateScope() { + return parent.generateScope(); + } + + @Override + public List<String> generateScope(Scope.ScopeFormat format) { + return parent.generateScope(format); + } + + public Counter getBytesInCounter() { + return this.numBytesIn; + } + + public Counter getBytesOutCounter() { + return this.numBytesOut; + } + + public Counter getRecordsInCounter() { + return this.numRecordsIn; + } + + public Counter getRecordsOutCounter() { + return this.numRecordsOut; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java new file mode 100644 index 0000000..35a01f8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.metrics.groups.TaskManagerMetricGroup.DEFAULT_SCOPE_TM; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a Job. + * <p< + * Contains extra logic for adding tasks. + */ +@Internal +public class JobMetricGroup extends ComponentMetricGroup { + public static final String SCOPE_JOB_DESCRIPTOR = "job"; + public static final String SCOPE_JOB_ID = Scope.format("job_id"); + public static final String SCOPE_JOB_NAME = Scope.format("job_name"); + public static final String DEFAULT_SCOPE_JOB_COMPONENT = Scope.concat(SCOPE_JOB_NAME); + public static final String DEFAULT_SCOPE_JOB = Scope.concat(DEFAULT_SCOPE_TM, DEFAULT_SCOPE_JOB_COMPONENT); + + private Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); + + public JobMetricGroup(MetricRegistry registry, TaskManagerMetricGroup taskManager, JobID id, String name) { + super(registry, taskManager, registry.getScopeConfig().getJobFormat()); + this.formats.put(SCOPE_JOB_ID, id.toString()); + this.formats.put(SCOPE_JOB_NAME, name); + } + + public TaskMetricGroup addTask(AbstractID id, AbstractID attemptID, int subtaskIndex, String name) { + TaskMetricGroup task = new TaskMetricGroup(this.registry, this, id, attemptID, subtaskIndex, name); + tasks.put(id, task); + return task; + } + + @Override + public void close() { + super.close(); + for (MetricGroup group : tasks.values()) { + group.close(); + } + tasks.clear(); + } + + @Override + protected String getScopeFormat(Scope.ScopeFormat format) { + return format.getJobFormat(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java new file mode 100644 index 0000000..6475eec --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/OperatorMetricGroup.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricRegistry; + +import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator. + */ +@Internal +public class OperatorMetricGroup extends ComponentMetricGroup { + public static final String SCOPE_OPERATOR_DESCRIPTOR = "operator"; + public static final String SCOPE_OPERATOR_NAME = Scope.format("operator_name"); + public static final String SCOPE_OPERATOR_SUBTASK_INDEX = Scope.format("subtask_index"); + public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = Scope.concat(SCOPE_OPERATOR_NAME, SCOPE_OPERATOR_SUBTASK_INDEX); + public static final String DEFAULT_SCOPE_OPERATOR = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_OPERATOR_COMPONENT); + + protected OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup task, String name, int subTaskIndex) { + super(registry, task, registry.getScopeConfig().getOperatorFormat()); + this.formats.put(SCOPE_OPERATOR_NAME, name); + this.formats.put(SCOPE_OPERATOR_SUBTASK_INDEX, "" + subTaskIndex); + } + + @Override + protected String getScopeFormat(Scope.ScopeFormat format) { + return format.getOperatorFormat(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java new file mode 100644 index 0000000..47bae37 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/Scope.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; + +/** + * This class provides utility-functions for handling scopes. + */ +@Internal +public class Scope { + public static final String SCOPE_WILDCARD = "*"; + + private static final String SCOPE_PREFIX = "<"; + private static final String SCOPE_SUFFIX = ">"; + private static final String SCOPE_SPLIT = "."; + + private Scope() { + } + + /** + * Modifies the given string to resemble a scope variable. + * + * @param scope string to format + * @return formatted string + */ + public static String format(String scope) { + return SCOPE_PREFIX + scope + SCOPE_SUFFIX; + } + + /** + * Joins the given components into a single scope. + * + * @param components components to join + * @return joined scoped + */ + public static String concat(String... components) { + StringBuilder sb = new StringBuilder(); + sb.append(components[0]); + for (int x = 1; x < components.length; x++) { + sb.append(SCOPE_SPLIT); + sb.append(components[x]); + } + return sb.toString(); + } + + /** + * Splits the given scope into it's individual components. + * + * @param scope scope to split + * @return array of components + */ + public static String[] split(String scope) { + return scope.split("\\" + SCOPE_SPLIT); + } + + /** + * Simple container for component scope format strings. + */ + public static class ScopeFormat { + private String operatorFormat = OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR; + private String taskFormat = TaskMetricGroup.DEFAULT_SCOPE_TASK; + private String jobFormat = JobMetricGroup.DEFAULT_SCOPE_JOB; + private String taskManagerFormat = TaskManagerMetricGroup.DEFAULT_SCOPE_TM; + + public ScopeFormat() { + } + + public ScopeFormat setOperatorFormat(String format) { + this.operatorFormat = format; + return this; + } + + public ScopeFormat setTaskFormat(String format) { + this.taskFormat = format; + return this; + } + + public ScopeFormat setJobFormat(String format) { + this.jobFormat = format; + return this; + } + + public ScopeFormat setTaskManagerFormat(String format) { + this.taskManagerFormat = format; + return this; + } + + public String getOperatorFormat() { + return this.operatorFormat; + } + + public String getTaskFormat() { + return this.taskFormat; + } + + public String getJobFormat() { + return this.jobFormat; + } + + public String getTaskManagerFormat() { + return this.taskManagerFormat; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java new file mode 100644 index 0000000..e199ca7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; + +import java.util.HashMap; +import java.util.Map; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a TaskManager. + * <p< + * Contains extra logic for adding jobs. + */ +@Internal +public class TaskManagerMetricGroup extends ComponentMetricGroup { + public static final String SCOPE_HOST_DESCRIPTOR = "host"; + public static final String SCOPE_TM_DESCRIPTOR = "taskmanager"; + public static final String SCOPE_TM_HOST = Scope.format("host"); + public static final String SCOPE_TM_ID = Scope.format("tm_id"); + public static final String DEFAULT_SCOPE_TM_COMPONENT = Scope.concat(SCOPE_TM_HOST, "taskmanager", SCOPE_TM_ID); + public static final String DEFAULT_SCOPE_TM = DEFAULT_SCOPE_TM_COMPONENT; + + private Map<JobID, JobMetricGroup> jobs = new HashMap<>(); + + public TaskManagerMetricGroup(MetricRegistry registry, String host, String id) { + super(registry, null, registry.getScopeConfig().getTaskManagerFormat()); + this.formats.put(SCOPE_TM_HOST, host); + this.formats.put(SCOPE_TM_ID, id); + } + + public JobMetricGroup addJob(JobID id, String name) { + JobMetricGroup task = new JobMetricGroup(this.registry, this, id, name); + jobs.put(id, task); + return task; + } + + @Override + public void close() { + super.close(); + for (MetricGroup group : jobs.values()) { + group.close(); + } + jobs.clear(); + } + + @Override + protected String getScopeFormat(Scope.ScopeFormat format) { + return format.getTaskManagerFormat(); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java new file mode 100644 index 0000000..4f8e010 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.metrics.groups.JobMetricGroup.DEFAULT_SCOPE_JOB; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a Task. + * <p< + * Contains extra logic for adding operators. + */ +@Internal +public class TaskMetricGroup extends ComponentMetricGroup { + public static final String SCOPE_TASK_DESCRIPTOR = "task"; + public static final String SCOPE_TASK_ID = Scope.format("task_id"); + public static final String SCOPE_TASK_NAME = Scope.format("task_name"); + public static final String SCOPE_TASK_ATTEMPT = Scope.format("task_attempt"); + public static final String SCOPE_TASK_SUBTASK_INDEX = Scope.format("subtask_index"); + public static final String DEFAULT_SCOPE_TASK_COMPONENT = SCOPE_TASK_NAME; + public static final String DEFAULT_SCOPE_TASK = Scope.concat(DEFAULT_SCOPE_JOB, DEFAULT_SCOPE_TASK_COMPONENT); + private final int subtaskIndex; + + private Map<String, OperatorMetricGroup> operators = new HashMap<>(); + private IOMetricGroup ioMetrics; + + protected TaskMetricGroup(MetricRegistry registry, JobMetricGroup job, AbstractID id, AbstractID attemptID, int subtaskIndex, String name) { + super(registry, job, registry.getScopeConfig().getTaskFormat()); + this.formats.put(SCOPE_TASK_ID, id.toString()); + this.formats.put(SCOPE_TASK_ATTEMPT, attemptID.toString()); + this.formats.put(SCOPE_TASK_NAME, name); + this.formats.put(SCOPE_TASK_SUBTASK_INDEX, "" + subtaskIndex); + this.subtaskIndex = subtaskIndex; + this.ioMetrics = new IOMetricGroup(registry, this); + } + + @Override + public void close() { + super.close(); + for (MetricGroup group : operators.values()) { + group.close(); + } + operators.clear(); + } + + public OperatorMetricGroup addOperator(String name) { + OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name, this.subtaskIndex); + operators.put(name, operator); + return operator; + } + + /** + * Returns the IOMetricGroup for this task. + * + * @return IOMetricGroup for this task. + */ + public IOMetricGroup getIOMetricGroup() { + return this.ioMetrics; + } + + @Override + protected String getScopeFormat(Scope.ScopeFormat format) { + return format.getTaskFormat(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java new file mode 100644 index 0000000..43f09b2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.reporter; + +import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; + +import java.util.Map; + +public abstract class AbstractReporter implements MetricReporter { + protected Map<String, Gauge> gauges = new ConcurrentHashMap<>(); + protected Map<String, Counter> counters = new ConcurrentHashMap<>(); + + @Override + public void notifyOfAddedMetric(Metric metric, String name) { + if (metric instanceof Counter) { + counters.put(name, (Counter) metric); + } else if (metric instanceof Gauge) { + gauges.put(name, (Gauge) metric); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String name) { + if (metric instanceof Counter) { + counters.remove(name); + } else if (metric instanceof Gauge) { + gauges.remove(name); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java new file mode 100644 index 0000000..0b2efe4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.reporter; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.List; + +/** + * {@link org.apache.flink.metrics.reporter.MetricReporter} that exports {@link org.apache.flink.metrics.Metric}s via JMX. + * + * Largely based on the JmxReporter class of the dropwizard metrics library + * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java + */ +@Internal +public class JMXReporter implements MetricReporter { + private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); + + private MBeanServer mBeanServer; + + private static final String PREFIX = "org.apache.flink.metrics:"; + private static final String KEY_PREFIX = "key"; + + public JMXReporter() { + this.mBeanServer = ManagementFactory.getPlatformMBeanServer(); + } + + @Override + public void notifyOfAddedMetric(Metric metric, String name) { + AbstractBean jmxMetric; + ObjectName jmxName; + try { + jmxName = new ObjectName(name); + } catch (MalformedObjectNameException e) { + throw new IllegalArgumentException("Metric name did not conform to JMX ObjectName rules: " + name, e); + } + + if (metric instanceof Gauge) { + jmxMetric = new JmxGauge((Gauge<?>) metric); + } else if (metric instanceof Counter) { + jmxMetric = new JmxCounter((Counter) metric); + } else { + throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); + } + + try { + mBeanServer.registerMBean(jmxMetric, jmxName); + } catch (NotCompliantMBeanException e) { //implementation error on our side + LOG.error("Metric did not comply with JMX MBean naming rules.", e); + } catch (InstanceAlreadyExistsException e) { + LOG.error("A metric with the name " + jmxName + " was already registered.", e); + } catch (MBeanRegistrationException e) { + LOG.error("Failed to register metric.", e); + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String name) { + try { + mBeanServer.unregisterMBean(new ObjectName(name)); + } catch (MBeanRegistrationException e) { + LOG.error("Un-registering metric failed.", e); + } catch (MalformedObjectNameException e) { + LOG.error("Un-registering metric failed due to invalid name.", e); + } catch (InstanceNotFoundException e) { + //alright then + } + } + + @Override + public void open(Configuration config) { + } + + @Override + public void close() { + } + + @Override + public String generateName(String name, List<String> origin) { + StringBuilder fullName = new StringBuilder(); + + fullName.append(PREFIX); + for (int x = 0; x < origin.size(); x++) { + fullName.append(KEY_PREFIX); + fullName.append(x); + fullName.append("="); + String value = origin.get(x); + value = value.replaceAll("\"", ""); + value = value.replaceAll(" ", "_"); + value = value.replaceAll("[,=;:?'*]", "-"); + fullName.append(value); + fullName.append(","); + } + fullName.append("name=" + name); + + return fullName.toString(); + } + + public interface MetricMBean { + } + + private abstract static class AbstractBean implements MetricMBean { + } + + public interface JmxCounterMBean extends MetricMBean { + long getCount(); + } + + private static class JmxCounter extends AbstractBean implements JmxCounterMBean { + private Counter counter; + + public JmxCounter(Counter counter) { + this.counter = counter; + } + + @Override + public long getCount() { + return counter.getCount(); + } + } + + public interface JmxGaugeMBean extends MetricMBean { + Object getValue(); + } + + private static class JmxGauge extends AbstractBean implements JmxGaugeMBean { + private final Gauge gauge; + + public JmxGauge(Gauge gauge) { + this.gauge = gauge; + } + + @Override + public Object getValue() { + return gauge.getValue(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java new file mode 100644 index 0000000..2bca606 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.reporter; + +import com.codahale.metrics.Reporter; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; + +import java.util.List; + +/** + * Reporters are used to export {@link org.apache.flink.metrics.Metric}s to an external backend. + * <p> + * Reporters are instantiated generically and must have a no-argument constructor. + */ +@PublicEvolving +public interface MetricReporter extends Reporter { + /** + * Configures this reporter. Since reporters are instantiated generically and hence parameter-less, + * this method is the place where the reporters set their basic fields based on configuration values. + * <p> + * This method is always called first on a newly instantiated reporter. + * + * @param config The configuration with all parameters. + */ + void open(Configuration config); + + /** + * Closes this reporter. Should be used to close channels, streams and release resources. + */ + void close(); + + /** + * Called when a new {@link org.apache.flink.metrics.Metric} was added. + * + * @param metric metric that was added + * @param name name of the metric + */ + void notifyOfAddedMetric(Metric metric, String name); + + /** + * Called when a {@link org.apache.flink.metrics.Metric} was removed. + * + * @param metric metric that was removed + * @param name name of the metric + */ + void notifyOfRemovedMetric(Metric metric, String name); + + /** + * Generates the reported name of a metric based on it's hierarchy/scope and associated name. + * + * @param name name of the metric + * @param scope hierarchy/scope of the metric + * @return reported name + */ + String generateName(String name, List<String> scope); +} http://git-wip-us.apache.org/repos/asf/flink/blob/003ce18e/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java new file mode 100644 index 0000000..3638f7a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.reporter; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Marker interface for reporters that actively send out data periodically. + */ +@PublicEvolving +public interface Scheduled { + /** + * Report the current measurements. + * This method is called in regular intervals + */ + void report(); +}
