This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 76dda5d3cf7 [FLINK-33318] Expose aggregated collector metrics and 
measure timeMsPerSecond
76dda5d3cf7 is described below

commit 76dda5d3cf7c4ecf255370679b27e11dd974a293
Author: Gyula Fora <[email protected]>
AuthorDate: Sun Oct 22 18:33:35 2023 +0200

    [FLINK-33318] Expose aggregated collector metrics and measure 
timeMsPerSecond
---
 docs/content/docs/ops/metrics.md                   | 51 +++++++------
 .../java/org/apache/flink/metrics/MeterView.java   | 39 ++++++++++
 .../metrics/job-manager-metrics.component.ts       |  6 +-
 .../flink/runtime/metrics/util/MetricUtils.java    | 32 ++++++--
 .../runtime/metrics/util/MetricUtilsTest.java      | 86 +++++++++++++++++++++-
 5 files changed, 182 insertions(+), 32 deletions(-)

diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 2bdb44cac98..ac106f6e6c9 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -315,7 +315,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
       .getMetricGroup()
       .histogram("myHistogram", new 
DropwizardHistogramWrapper(dropwizardHistogram));
   }
-  
+
   @Override
   public Long map(Long value) throws Exception {
     this.histogram.update(value);
@@ -333,12 +333,12 @@ class MyMapper extends RichMapFunction[Long, Long] {
   override def open(config: Configuration): Unit = {
     val dropwizardHistogram =
       new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
-        
+
     histogram = getRuntimeContext()
       .getMetricGroup()
       .histogram("myHistogram", new 
DropwizardHistogramWrapper(dropwizardHistogram))
   }
-  
+
   override def map(value: Long): Long = {
     histogram.update(value)
     value
@@ -464,7 +464,7 @@ class MyMapper extends RichMapFunction[Long,Long] {
 
   override def open(config: Configuration): Unit = {
     val dropwizardMeter: com.codahale.metrics.Meter = new 
com.codahale.metrics.Meter()
-  
+
     meter = getRuntimeContext()
       .getMetricGroup()
       .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
@@ -687,7 +687,7 @@ Thus, in order to infer the metric identifier:
 </table>
 
 ### Memory
-The memory-related metrics require Oracle's memory management (also included 
in OpenJDK's Hotspot implementation) to be in place. 
+The memory-related metrics require Oracle's memory management (also included 
in OpenJDK's Hotspot implementation) to be in place.
 Some metrics might not be exposed when using other JVM implementations (e.g. 
IBM's J9).
 <table class="table table-bordered">                               
   <thead>                                                          
@@ -715,8 +715,8 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
     <tr>
       <td>Heap.Max</td>
       <td>The maximum amount of heap memory that can be used for memory 
management (in bytes). <br/>
-      This value might not be necessarily equal to the maximum value specified 
through -Xmx or 
-      the equivalent Flink configuration parameter. Some GC algorithms 
allocate heap memory that won't 
+      This value might not be necessarily equal to the maximum value specified 
through -Xmx or
+      the equivalent Flink configuration parameter. Some GC algorithms 
allocate heap memory that won't
       be available to the user code and, therefore, not being exposed through 
the heap metrics.</td>
       <td>Gauge</td>
     </tr>
@@ -829,15 +829,20 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="2"><strong>Job-/TaskManager</strong></th>
-      <td rowspan="2">Status.JVM.GarbageCollector</td>
-      <td>&lt;GarbageCollector&gt;.Count</td>
-      <td>The total number of collections that have occurred.</td>
+      <th rowspan="3"><strong>Job-/TaskManager</strong></th>
+      <td rowspan="3">Status.JVM.GarbageCollector</td>
+      <td>&lt;Collector/All&gt;.Count</td>
+      <td>The total number of collections that have occurred for the given (or 
all) collector.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;Collector/All&gt;.Time</td>
+      <td>The total time spent performing garbage collection for the given (or 
all) collector.</td>
       <td>Gauge</td>
     </tr>
     <tr>
-      <td>&lt;GarbageCollector&gt;.Time</td>
-      <td>The total time spent performing garbage collection.</td>
+      <td>&lt;Collector/All&gt;.TimeMsPerSecond</td>
+      <td>The time (in milliseconds) spent garbage collecting per second for 
the given (or all) collector.</td>
       <td>Gauge</td>
     </tr>
   </tbody>
@@ -1904,7 +1909,7 @@ Please refer to [Kafka monitoring]({{< ref 
"docs/connectors/datastream/kafka" >}
       <th rowspan="1">Operator</th>
       <td>loopFrequencyHz</td>
       <td>stream, shardId</td>
-      <td>The number of calls to getRecords in one second. 
+      <td>The number of calls to getRecords in one second.
       </td>
       <td>Gauge</td>
     </tr>
@@ -2147,7 +2152,7 @@ Metrics below can be used to measure the effectiveness of 
speculative execution.
     </tr>
     <tr>
       <td>numEffectiveSpeculativeExecutions</td>
-      <td>Number of effective speculative execution attempts, i.e. speculative 
execution attempts which 
+      <td>Number of effective speculative execution attempts, i.e. speculative 
execution attempts which
       finish earlier than their corresponding original attempts.</td>
       <td>Counter</td>
     </tr>
@@ -2162,7 +2167,7 @@ To enable the latency tracking you must set the 
`latencyTrackingInterval` to a p
 
 At the `latencyTrackingInterval`, the sources will periodically emit a special 
record, called a `LatencyMarker`.
 The marker contains a timestamp from the time when the record has been emitted 
at the sources.
-Latency markers can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
+Latency markers can not overtake regular user records, thus if records are 
queuing up in front of an operator,
 it will add to the latency tracked by the marker.
 
 Note that the latency markers are not accounting for the time user records 
spend in operators as they are
@@ -2170,17 +2175,17 @@ bypassing them. In particular the markers are not 
accounting for the time record
 Only if operators are not able to accept new records, thus they are queuing 
up, the latency measured using
 the markers will reflect that.
 
-The `LatencyMarker`s are used to derive a distribution of the latency between 
the sources of the topology and each 
-downstream operator. These distributions are reported as histogram metrics. 
The granularity of these distributions can 
-be controlled in the [Flink configuration]({{< ref "docs/deployment/config" 
>}}#metrics-latency-interval). For the highest 
-granularity `subtask` Flink will derive the latency distribution between every 
source subtask and every downstream 
-subtask, which results in quadratic (in the terms of the parallelism) number 
of histograms. 
+The `LatencyMarker`s are used to derive a distribution of the latency between 
the sources of the topology and each
+downstream operator. These distributions are reported as histogram metrics. 
The granularity of these distributions can
+be controlled in the [Flink configuration]({{< ref "docs/deployment/config" 
>}}#metrics-latency-interval). For the highest
+granularity `subtask` Flink will derive the latency distribution between every 
source subtask and every downstream
+subtask, which results in quadratic (in the terms of the parallelism) number 
of histograms.
 
 Currently, Flink assumes that the clocks of all machines in the cluster are in 
sync. We recommend setting
 up an automated clock synchronisation service (like NTP) to avoid false 
latency results.
 
 <span class="label label-danger">Warning</span> Enabling latency metrics can 
significantly impact the performance
-of the cluster (in particular for `subtask` granularity). It is highly 
recommended to only use them for debugging 
+of the cluster (in particular for `subtask` granularity). It is highly 
recommended to only use them for debugging
 purposes.
 
 ## State access latency tracking
@@ -2194,7 +2199,7 @@ This configuration has a default value of 100. A smaller 
value will get more acc
 As the type of this latency metrics is histogram, 
`state.backend.latency-track.history-size` will control the maximum number of 
recorded values in history, which has the default value of 128.
 A larger value of this configuration will require more memory, but will 
provide a more accurate result.
 
-<span class="label label-danger">Warning</span> Enabling state-access-latency 
metrics may impact the performance. 
+<span class="label label-danger">Warning</span> Enabling state-access-latency 
metrics may impact the performance.
 It is recommended to only use them for debugging purposes.
 
 ## REST API integration
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
index ddb7e3c4fd3..61a087cfde9 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -71,6 +71,10 @@ public class MeterView implements Meter, View {
         this.values = new long[this.timeSpanInSeconds / 
UPDATE_INTERVAL_SECONDS + 1];
     }
 
+    public MeterView(Gauge<? extends Number> numberGauge) {
+        this(new GaugeWrapper(numberGauge));
+    }
+
     @Override
     public void markEvent() {
         this.counter.inc();
@@ -98,4 +102,39 @@ public class MeterView implements Meter, View {
         currentRate =
                 ((double) (values[time] - values[(time + 1) % values.length]) 
/ timeSpanInSeconds);
     }
+
+    /** Simple wrapper to expose number gauges as timers. */
+    static class GaugeWrapper implements Counter {
+
+        final Gauge<? extends Number> numberGauge;
+
+        GaugeWrapper(Gauge<? extends Number> numberGauge) {
+            this.numberGauge = numberGauge;
+        }
+
+        @Override
+        public void inc() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void inc(long n) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dec() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void dec(long n) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getCount() {
+            return numberGauge.getValue().longValue();
+        }
+    }
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
index 8888fa314fe..bb759e26940 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job-manager/metrics/job-manager-metrics.component.ts
@@ -112,7 +112,11 @@ export class JobManagerMetricsComponent implements OnInit, 
OnDestroy {
           this.listOfGCMetric = Array.from(
             new Set(
               this.listOfGCName.map(item =>
-                item.replace('Status.JVM.GarbageCollector.', 
'').replace('.Count', '').replace('.Time', '')
+                item
+                  .replace('Status.JVM.GarbageCollector.', '')
+                  .replace('.Count', '')
+                  .replace('.TimeMsPerSecond', '')
+                  .replace('.Time', '')
               )
             )
           ).map(name => {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 520a54ec603..0074dfbe07e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -129,7 +130,8 @@ public class MetricUtils {
         MetricGroup jvm = metricGroup.addGroup("JVM");
 
         instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
-        instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+        instantiateGarbageCollectorMetrics(
+                jvm.addGroup("GarbageCollector"), 
ManagementFactory.getGarbageCollectorMXBeans());
         instantiateMemoryMetrics(jvm.addGroup(METRIC_GROUP_MEMORY));
         instantiateThreadMetrics(jvm.addGroup("Threads"));
         instantiateCPUMetrics(jvm.addGroup("CPU"));
@@ -222,16 +224,32 @@ public class MetricUtils {
         metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", 
mxBean::getUnloadedClassCount);
     }
 
-    private static void instantiateGarbageCollectorMetrics(MetricGroup 
metrics) {
-        List<GarbageCollectorMXBean> garbageCollectors =
-                ManagementFactory.getGarbageCollectorMXBeans();
-
+    @VisibleForTesting
+    static void instantiateGarbageCollectorMetrics(
+            MetricGroup metrics, List<GarbageCollectorMXBean> 
garbageCollectors) {
         for (final GarbageCollectorMXBean garbageCollector : 
garbageCollectors) {
             MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
 
-            gcGroup.<Long, Gauge<Long>>gauge("Count", 
garbageCollector::getCollectionCount);
-            gcGroup.<Long, Gauge<Long>>gauge("Time", 
garbageCollector::getCollectionTime);
+            gcGroup.gauge("Count", garbageCollector::getCollectionCount);
+            Gauge<Long> timeGauge = gcGroup.gauge("Time", 
garbageCollector::getCollectionTime);
+            gcGroup.meter("TimeMsPerSecond", new MeterView(timeGauge));
         }
+        Gauge<Long> totalGcTime =
+                () ->
+                        garbageCollectors.stream()
+                                
.mapToLong(GarbageCollectorMXBean::getCollectionTime)
+                                .sum();
+
+        Gauge<Long> totalGcCount =
+                () ->
+                        garbageCollectors.stream()
+                                
.mapToLong(GarbageCollectorMXBean::getCollectionCount)
+                                .sum();
+
+        MetricGroup allGroup = metrics.addGroup("All");
+        allGroup.gauge("Count", totalGcCount);
+        Gauge<Long> totalTime = allGroup.gauge("Time", totalGcTime);
+        allGroup.meter("TimeMsPerSecond", new MeterView(totalTime));
     }
 
     private static void instantiateMemoryMetrics(MetricGroup metrics) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index e0c70638ed9..c98637fc37c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics.util;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
@@ -42,10 +43,15 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
+import javax.management.ObjectName;
+
+import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK;
@@ -110,7 +116,6 @@ class MetricUtilsTest {
         assertThat(hasMetaspaceMemoryPool())
                 .withFailMessage("Requires JVM with Metaspace memory pool")
                 .isTrue();
-
         final InterceptingOperatorMetricGroup metaspaceMetrics =
                 new InterceptingOperatorMetricGroup() {
                     @Override
@@ -126,6 +131,42 @@ class MetricUtilsTest {
         assertThat(metaspaceMetrics.get(MetricNames.MEMORY_MAX)).isNotNull();
     }
 
+    @Test
+    public void testGcMetricCompleteness() {
+        Map<String, InterceptingOperatorMetricGroup> addedGroups = new 
HashMap<>();
+        InterceptingOperatorMetricGroup gcGroup =
+                new InterceptingOperatorMetricGroup() {
+                    @Override
+                    public MetricGroup addGroup(String name) {
+                        return addedGroups.computeIfAbsent(
+                                name, k -> new 
InterceptingOperatorMetricGroup());
+                    }
+                };
+
+        List<GarbageCollectorMXBean> garbageCollectors = new ArrayList<>();
+        garbageCollectors.add(new TestGcBean("gc1", 100, 500));
+        garbageCollectors.add(new TestGcBean("gc2", 50, 250));
+
+        MetricUtils.instantiateGarbageCollectorMetrics(gcGroup, 
garbageCollectors);
+        assertThat(addedGroups).containsOnlyKeys("gc1", "gc2", "All");
+
+        // Make sure individual collector metrics are correct
+        validateCollectorMetric(addedGroups.get("gc1"), 100, 500L);
+        validateCollectorMetric(addedGroups.get("gc2"), 50L, 250L);
+
+        // Make sure all/total collector metrics are correct
+        validateCollectorMetric(addedGroups.get("All"), 150L, 750L);
+    }
+
+    private static void validateCollectorMetric(
+            InterceptingOperatorMetricGroup group, long count, long time) {
+        assertThat(((Gauge) group.get("Count")).getValue()).isEqualTo(count);
+        assertThat(((Gauge) group.get("Time")).getValue()).isEqualTo(time);
+        MeterView perSecond = ((MeterView) group.get("TimeMsPerSecond"));
+        perSecond.update();
+        assertThat(perSecond.getRate()).isGreaterThan(0.);
+    }
+
     @Test
     void testHeapMetricsCompleteness() {
         final InterceptingOperatorMetricGroup heapMetrics = new 
InterceptingOperatorMetricGroup();
@@ -296,4 +337,47 @@ class MetricUtilsTest {
                         "%s usage metric never changed its value after %d 
runs.", name, maxRuns);
         fail(msg);
     }
+
+    static class TestGcBean implements GarbageCollectorMXBean {
+
+        final String name;
+        final long collectionCount;
+        final long collectionTime;
+
+        public TestGcBean(String name, long collectionCount, long 
collectionTime) {
+            this.name = name;
+            this.collectionCount = collectionCount;
+            this.collectionTime = collectionTime;
+        }
+
+        @Override
+        public long getCollectionCount() {
+            return collectionCount;
+        }
+
+        @Override
+        public long getCollectionTime() {
+            return collectionTime;
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public boolean isValid() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String[] getMemoryPoolNames() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ObjectName getObjectName() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

Reply via email to