[
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350884#comment-15350884
]
ASF GitHub Bot commented on FLINK-4116:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2158#discussion_r68564324
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is
executed. The plan visualiza
the execution plan before executing the Flink job.
{% top %}
+
+Metrics
+-------------------
+
+Flink exposes a metric system that allows users gather and expose metrics
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and
register new metrics.
+
+If you want to count the number of records your user function has received
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction<String, Integer> {
+ private Counter counter;
+
+ @Override
+ public void open(Configuration config) {
+ // create and register a counter
+ this.counter =
getRuntimeContext().getMetricGroup().counter("myCounter");
+ ...
+ }
+
+ @public Integer map(String value) throws Exception {
+ // increment counter
+ this.counter.inc();
+ ...
+ }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)`
on a MetricGroup. Alternatively, you can pass an instance of your own Counter
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge
you must first create a class that implements the
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that
allows usage of Codahale/DropWizard Histograms.
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which
represent the entities it is tied to. By default a metric that is registered in
a user function will be scoped to the Operator in which the function runs, the
Task/Job it belongs to and the TaskManager/Host it is executed on. This is
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 =
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the
name passed in the `counter() call. The order is always
\<system_scope>\<user_scope>\<name>.
+
+The system scope allows the reported name to contain contextual
information like the name of job it was registered in without requiring the
user to pass this information manually.
+
+How the system scope affects the reported name for a metric can be
modified by setting the following keys in the flink-conf.yaml.
+Each of these keys expect a format string that may contain constants (e.g.
"taskmanager") and variables (e.g. "\<task_id>") which will be replaced at
runtime.
+
+- `metrics.scope.jm`
+ - Default: \<host>.jobmanager
+- `metrics.scope.jm.job`
+ - Default: \<host>.jobmanager.\<job_name>
+- `metrics.scope.tm`
+ - Default: \<host>.taskmanager.\<tm_id>
+- `metrics.scope.tm.job`
+ - Default: \<host>.taskmanager.\<tm_id>.\<job_name>
+- `metrics.scope.task`
+ - Default:
\<host>.taskmanager.\<tm_id>.\<job_name>.\<task_name>.\<subtask_index>
+- `metrics.scope.operator`
+ - Default:
\<host>.taskmanager.\<tm_id>.\<job_name>.\<operator_name>.\<subtask_index>
+
+The following is a list of all variables available:
+
+- JobManager: \<host>
+- TaskManager: \<host>, \<tm_id>
+- Job: \<job_id>, \<job_name>
+- Task: \<task_id>, \<task_name>, \<task_attempt_id>, \<task_attempt_num>,
\<subtask_index>
+- Operator: \<operator_name>, \<subtask_index>
+
+There is no restriction on the order of variables.
+
+Which format is applied to a metric depends on which entities it was
scoped to.
+The `metrics.scope.operator` format will be applied to all metrics that
were scoped to an Operator.
+The `metrics.scope.tm.job` format will be applied to all metrics that were
only scoped to a Job and a TaskManager.
+
+This allows you to define explicitly how metrics on every level should be
reported.
+
+### Reporter
+
+Metrics can be exposed to an external system by configuring a reporter in
the `conf/flink-conf.yaml`.
+
+- `metrics.reporter.class`: The class of the reporter to use.
+ - Example: org.apache.flink.metrics.reporter.JMXReporter)
+- `metrics.reporter.arguments`: A list of named parameters that are passed
to the reporter.
+ - Example: --host localhost --port 9010)
+- `metrics.reporter.interval`: The interval between reports.
+ - Example: 10 SECONDS)
+
+By default Flink uses JMX to expose metrics.
+
+The port for JMX can be configured by setting the `metrics.jmx.port` key.
This parameter expects either a single port
+or a port range, with the default being 9010-9025. Which port is used in
the end is shown in the relevant Job-/TaskManager log.
+
+All non-JMXReporters are not part of the distribution and have to be added
to the classpath manually. (usually by putting the jar into /lib)
+
+You can write your own Reporter by implementing the
`org.apache.flink.metrics.reporter.MetricReporter` interface.
+If the Reporter should send out reports regularly you have to implement
the Scheduled interface as well.
+
+Flink supports the following systems with their respective configuration
parameters:
+
+#### Ganglia (org.apcahe.flink.metrics.ganglia.GangliaReporter)
+- `host`
+- `port`
+- `dmax`
+- `tmax`
+- `ttl`
+- `addressingMode` (UNICAST/MULTICAST)
+
+#### Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)
+- `host`
+- `port`
+
+#### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
+- `host`
+- `port`
+
+### System metrics
+
+Flink exposes the following system metrics:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Scope</th>
+ <th class="text-center">Metrics</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>JobManager</strong></td>
+ <td>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>TaskManager</strong></td>
+ <td>
+ <p>Status.JVM.ClassLoader.ClassesLoaded</p>
+ <p>Status.JVM.ClassLoader.ClassesUnloaded</p>
+ <p></p>
+ <p>Status.JVM.GargabeCollector.<garbageCollector>.Count</p>
+ <p>Status.JVM.GargabeCollector.<garbageCollector>.Time</p>
+ <p></p>
+ <p>Status.JVM.Memory.Heap.Used</p>
+ <p>Status.JVM.Memory.Heap.Committed</p>
+ <p>Status.JVM.Memory.Heap.Max</p>
+ <p>Status.JVM.Memory.NonHeap.Used</p>
+ <p>Status.JVM.Memory.NonHeap.Committed</p>
+ <p>Status.JVM.Memory.NonHeap.Max</p>
+ <p>Status.JVM.Memory.Direct.Count</p>
+ <p>Status.JVM.Memory.Direct.MemoryUsed</p>
+ <p>Status.JVM.Memory.Direct.TotalCapacity</p>
+ <p>Status.JVM.Memory.Mapped.Count</p>
+ <p>Status.JVM.Memory.Mapped.MemoryUsed</p>
+ <p>Status.JVM.Memory.Mapped.TotalCapacity</p>
+ <p></p>
+ <p>Status.JVM.Threads.Count</p>
+ <p></p>
+ <p>Status.JVM.CPU.Load</p>
+ <p>Status.JVM.CPU.Time</p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Job</strong></td>
+ <td>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Task</strong></td>
+ <td>
+ <p>currentLowWatermark</p>
+ <p>lastCheckpointSize</p>
+ <p>numBytesInLocal</p>
+ <p>numBytesInRemote</p>
+ <p>numBytesOut</p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Operator</strong></td>
+ <td>
+ <p>numRecordsIn</p>
+ <p>numRecordsOut</p>
+ <p>numSplitsProcessed</p>
+ </td>
+ </tr>
+ </tbody>
+</table>
--- End diff --
Explanation of metrics could be helpful.
> Document metrics
> ----------------
>
> Key: FLINK-4116
> URL: https://issues.apache.org/jira/browse/FLINK-4116
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Metrics
> Affects Versions: 1.1.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> The metric system is currently not documented, which should be fixed before
> the 1.1 release.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)