This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 76dda5d3cf7 [FLINK-33318] Expose aggregated collector metrics and
measure timeMsPerSecond
76dda5d3cf7 is described below
commit 76dda5d3cf7c4ecf255370679b27e11dd974a293
Author: Gyula Fora <[email protected]>
AuthorDate: Sun Oct 22 18:33:35 2023 +0200
[FLINK-33318] Expose aggregated collector metrics and measure
timeMsPerSecond
---
docs/content/docs/ops/metrics.md | 51 +++++++------
.../java/org/apache/flink/metrics/MeterView.java | 39 ++++++++++
.../metrics/job-manager-metrics.component.ts | 6 +-
.../flink/runtime/metrics/util/MetricUtils.java | 32 ++++++--
.../runtime/metrics/util/MetricUtilsTest.java | 86 +++++++++++++++++++++-
5 files changed, 182 insertions(+), 32 deletions(-)
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 2bdb44cac98..ac106f6e6c9 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -315,7 +315,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
.getMetricGroup()
.histogram("myHistogram", new
DropwizardHistogramWrapper(dropwizardHistogram));
}
-
+
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
@@ -333,12 +333,12 @@ class MyMapper extends RichMapFunction[Long, Long] {
override def open(config: Configuration): Unit = {
val dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
-
+
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new
DropwizardHistogramWrapper(dropwizardHistogram))
}
-
+
override def map(value: Long): Long = {
histogram.update(value)
value
@@ -464,7 +464,7 @@ class MyMapper extends RichMapFunction[Long,Long] {
override def open(config: Configuration): Unit = {
val dropwizardMeter: com.codahale.metrics.Meter = new
com.codahale.metrics.Meter()
-
+
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
@@ -687,7 +687,7 @@ Thus, in order to infer the metric identifier:
</table>
### Memory
-The memory-related metrics require Oracle's memory management (also included
in OpenJDK's Hotspot implementation) to be in place.
+The memory-related metrics require Oracle's memory management (also included
in OpenJDK's Hotspot implementation) to be in place.
Some metrics might not be exposed when using other JVM implementations (e.g.
IBM's J9).
<table class="table table-bordered">
<thead>
@@ -715,8 +715,8 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<tr>
<td>Heap.Max</td>
<td>The maximum amount of heap memory that can be used for memory
management (in bytes). <br/>
- This value might not be necessarily equal to the maximum value specified
through -Xmx or
- the equivalent Flink configuration parameter. Some GC algorithms
allocate heap memory that won't
+ This value might not be necessarily equal to the maximum value specified
through -Xmx or
+ the equivalent Flink configuration parameter. Some GC algorithms
allocate heap memory that won't
be available to the user code and, therefore, not being exposed through
the heap metrics.</td>
<td>Gauge</td>
</tr>
@@ -829,15 +829,20 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="2"><strong>Job-/TaskManager</strong></th>
- <td rowspan="2">Status.JVM.GarbageCollector</td>
- <td><GarbageCollector>.Count</td>
- <td>The total number of collections that have occurred.</td>
+ <th rowspan="3"><strong>Job-/TaskManager</strong></th>
+ <td rowspan="3">Status.JVM.GarbageCollector</td>
+ <td><Collector/All>.Count</td>
+ <td>The total number of collections that have occurred for the given (or
all) collector.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td><Collector/All>.Time</td>
+ <td>The total time spent performing garbage collection for the given (or
all) collector.</td>
<td>Gauge</td>
</tr>
<tr>
- <td><GarbageCollector>.Time</td>
- <td>The total time spent performing garbage collection.</td>
+ <td><Collector/All>.TimeMsPerSecond</td>
+ <td>The time (in milliseconds) spent garbage collecting per second for
the given (or all) collector.</td>
<td>Gauge</td>
</tr>
</tbody>
@@ -1904,7 +1909,7 @@ Please refer to [Kafka monitoring]({{< ref
"docs/connectors/datastream/kafka" >}
<th rowspan="1">Operator</th>
<td>loopFrequencyHz</td>
<td>stream, shardId</td>
- <td>The number of calls to getRecords in one second.
+ <td>The number of calls to getRecords in one second.
</td>
<td>Gauge</td>
</tr>
@@ -2147,7 +2152,7 @@ Metrics below can be used to measure the effectiveness of
speculative execution.
</tr>
<tr>
<td>numEffectiveSpeculativeExecutions</td>
- <td>Number of effective speculative execution attempts, i.e. speculative
execution attempts which
+ <td>Number of effective speculative execution attempts, i.e. speculative
execution attempts which
finish earlier than their corresponding original attempts.</td>
<td>Counter</td>
</tr>
@@ -2162,7 +2167,7 @@ To enable the latency tracking you must set the
`latencyTrackingInterval` to a p
At the `latencyTrackingInterval`, the sources will periodically emit a special
record, called a `LatencyMarker`.
The marker contains a timestamp from the time when the record has been emitted
at the sources.
-Latency markers can not overtake regular user records, thus if records are
queuing up in front of an operator,
+Latency markers can not overtake regular user records, thus if records are
queuing up in front of an operator,
it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records
spend in operators as they are
@@ -2170,17 +2175,17 @@ bypassing them. In particular the markers are not
accounting for the time record
Only if operators are not able to accept new records, thus they are queuing
up, the latency measured using
the markers will reflect that.
-The `LatencyMarker`s are used to derive a distribution of the latency between
the sources of the topology and each
-downstream operator. These distributions are reported as histogram metrics.
The granularity of these distributions can
-be controlled in the [Flink configuration]({{< ref "docs/deployment/config"
>}}#metrics-latency-interval). For the highest
-granularity `subtask` Flink will derive the latency distribution between every
source subtask and every downstream
-subtask, which results in quadratic (in the terms of the parallelism) number
of histograms.
+The `LatencyMarker`s are used to derive a distribution of the latency between
the sources of the topology and each
+downstream operator. These distributions are reported as histogram metrics.
The granularity of these distributions can
+be controlled in the [Flink configuration]({{< ref "docs/deployment/config"
>}}#metrics-latency-interval). For the highest
+granularity `subtask` Flink will derive the latency distribution between every
source subtask and every downstream
+subtask, which results in quadratic (in the terms of the parallelism) number
of histograms.
Currently, Flink assumes that the clocks of all machines in the cluster are in
sync. We recommend setting
up an automated clock synchronisation service (like NTP) to avoid false
latency results.
<span class="label label-danger">Warning</span> Enabling latency metrics can
significantly impact the performance
-of the cluster (in particular for `subtask` granularity). It is highly
recommended to only use them for debugging
+of the cluster (in particular for `subtask` granularity). It is highly
recommended to only use them for debugging
purposes.
## State access latency tracking
@@ -2194,7 +2199,7 @@ This configuration has a default value of 100. A smaller
value will get more acc
As the type of this latency metrics is histogram,
`state.backend.latency-track.history-size` will control the maximum number of
recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will
provide a more accurate result.
-<span class="label label-danger">Warning</span> Enabling state-access-latency
metrics may impact the performance.
+<span class="label label-danger">Warning</span> Enabling state-access-latency
metrics may impact the performance.
It is recommended to only use them for debugging purposes.
## REST API integration
diff --git
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
index ddb7e3c4fd3..61a087cfde9 100644
---
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
+++
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -71,6 +71,10 @@ public class MeterView implements Meter, View {
this.values = new long[this.timeSpanInSeconds /
UPDATE_INTERVAL_SECONDS + 1];
}
+ public MeterView(Gauge<? extends Number> numberGauge) {
+ this(new GaugeWrapper(numberGauge));
+ }
+
@Override
public void markEvent() {
this.counter.inc();
@@ -98,4 +102,39 @@ public class MeterView implements Meter, View {
currentRate =
((double) (values[time] - values[(time + 1) % values.length])
/ timeSpanInSeconds);
}
+
+ /** Simple wrapper to expose number gauges as timers. */
+ static class GaugeWrapper implements Counter {
+
+ final Gauge<? extends Number> numberGauge;
+
+ GaugeWrapper(Gauge<? extends Number> numberGauge) {
+ this.numberGauge = numberGauge;
+ }
+
+ @Override
+ public void inc() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void inc(long n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dec() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dec(long n) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCount() {
+ return numberGauge.getValue().longValue();
+ }
+ }
}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
index 8888fa314fe..bb759e26940 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
@@ -112,7 +112,11 @@ export class JobManagerMetricsComponent implements OnInit,
OnDestroy {
this.listOfGCMetric = Array.from(
new Set(
this.listOfGCName.map(item =>
- item.replace('Status.JVM.GarbageCollector.',
'').replace('.Count', '').replace('.Time', '')
+ item
+ .replace('Status.JVM.GarbageCollector.', '')
+ .replace('.Count', '')
+ .replace('.TimeMsPerSecond', '')
+ .replace('.Time', '')
)
)
).map(name => {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 520a54ec603..0074dfbe07e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -129,7 +130,8 @@ public class MetricUtils {
MetricGroup jvm = metricGroup.addGroup("JVM");
instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
- instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+ instantiateGarbageCollectorMetrics(
+ jvm.addGroup("GarbageCollector"),
ManagementFactory.getGarbageCollectorMXBeans());
instantiateMemoryMetrics(jvm.addGroup(METRIC_GROUP_MEMORY));
instantiateThreadMetrics(jvm.addGroup("Threads"));
instantiateCPUMetrics(jvm.addGroup("CPU"));
@@ -222,16 +224,32 @@ public class MetricUtils {
metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded",
mxBean::getUnloadedClassCount);
}
- private static void instantiateGarbageCollectorMetrics(MetricGroup
metrics) {
- List<GarbageCollectorMXBean> garbageCollectors =
- ManagementFactory.getGarbageCollectorMXBeans();
-
+ @VisibleForTesting
+ static void instantiateGarbageCollectorMetrics(
+ MetricGroup metrics, List<GarbageCollectorMXBean>
garbageCollectors) {
for (final GarbageCollectorMXBean garbageCollector :
garbageCollectors) {
MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
- gcGroup.<Long, Gauge<Long>>gauge("Count",
garbageCollector::getCollectionCount);
- gcGroup.<Long, Gauge<Long>>gauge("Time",
garbageCollector::getCollectionTime);
+ gcGroup.gauge("Count", garbageCollector::getCollectionCount);
+ Gauge<Long> timeGauge = gcGroup.gauge("Time",
garbageCollector::getCollectionTime);
+ gcGroup.meter("TimeMsPerSecond", new MeterView(timeGauge));
}
+ Gauge<Long> totalGcTime =
+ () ->
+ garbageCollectors.stream()
+
.mapToLong(GarbageCollectorMXBean::getCollectionTime)
+ .sum();
+
+ Gauge<Long> totalGcCount =
+ () ->
+ garbageCollectors.stream()
+
.mapToLong(GarbageCollectorMXBean::getCollectionCount)
+ .sum();
+
+ MetricGroup allGroup = metrics.addGroup("All");
+ allGroup.gauge("Count", totalGcCount);
+ Gauge<Long> totalTime = allGroup.gauge("Time", totalGcTime);
+ allGroup.meter("TimeMsPerSecond", new MeterView(totalTime));
}
private static void instantiateMemoryMetrics(MetricGroup metrics) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index e0c70638ed9..c98637fc37c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics.util;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.memory.MemoryAllocationException;
@@ -42,10 +43,15 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import javax.management.ObjectName;
+
+import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import static
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK;
@@ -110,7 +116,6 @@ class MetricUtilsTest {
assertThat(hasMetaspaceMemoryPool())
.withFailMessage("Requires JVM with Metaspace memory pool")
.isTrue();
-
final InterceptingOperatorMetricGroup metaspaceMetrics =
new InterceptingOperatorMetricGroup() {
@Override
@@ -126,6 +131,42 @@ class MetricUtilsTest {
assertThat(metaspaceMetrics.get(MetricNames.MEMORY_MAX)).isNotNull();
}
+ @Test
+ public void testGcMetricCompleteness() {
+ Map<String, InterceptingOperatorMetricGroup> addedGroups = new
HashMap<>();
+ InterceptingOperatorMetricGroup gcGroup =
+ new InterceptingOperatorMetricGroup() {
+ @Override
+ public MetricGroup addGroup(String name) {
+ return addedGroups.computeIfAbsent(
+ name, k -> new
InterceptingOperatorMetricGroup());
+ }
+ };
+
+ List<GarbageCollectorMXBean> garbageCollectors = new ArrayList<>();
+ garbageCollectors.add(new TestGcBean("gc1", 100, 500));
+ garbageCollectors.add(new TestGcBean("gc2", 50, 250));
+
+ MetricUtils.instantiateGarbageCollectorMetrics(gcGroup,
garbageCollectors);
+ assertThat(addedGroups).containsOnlyKeys("gc1", "gc2", "All");
+
+ // Make sure individual collector metrics are correct
+ validateCollectorMetric(addedGroups.get("gc1"), 100, 500L);
+ validateCollectorMetric(addedGroups.get("gc2"), 50L, 250L);
+
+ // Make sure all/total collector metrics are correct
+ validateCollectorMetric(addedGroups.get("All"), 150L, 750L);
+ }
+
+ private static void validateCollectorMetric(
+ InterceptingOperatorMetricGroup group, long count, long time) {
+ assertThat(((Gauge) group.get("Count")).getValue()).isEqualTo(count);
+ assertThat(((Gauge) group.get("Time")).getValue()).isEqualTo(time);
+ MeterView perSecond = ((MeterView) group.get("TimeMsPerSecond"));
+ perSecond.update();
+ assertThat(perSecond.getRate()).isGreaterThan(0.);
+ }
+
@Test
void testHeapMetricsCompleteness() {
final InterceptingOperatorMetricGroup heapMetrics = new
InterceptingOperatorMetricGroup();
@@ -296,4 +337,47 @@ class MetricUtilsTest {
"%s usage metric never changed its value after %d
runs.", name, maxRuns);
fail(msg);
}
+
+ static class TestGcBean implements GarbageCollectorMXBean {
+
+ final String name;
+ final long collectionCount;
+ final long collectionTime;
+
+ public TestGcBean(String name, long collectionCount, long
collectionTime) {
+ this.name = name;
+ this.collectionCount = collectionCount;
+ this.collectionTime = collectionTime;
+ }
+
+ @Override
+ public long getCollectionCount() {
+ return collectionCount;
+ }
+
+ @Override
+ public long getCollectionTime() {
+ return collectionTime;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean isValid() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String[] getMemoryPoolNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectName getObjectName() {
+ throw new UnsupportedOperationException();
+ }
+ }
}