This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b82a5b2fe835 feat: add metrics for bucketassign.minibatch cache hit 
ratio (#18761)
b82a5b2fe835 is described below

commit b82a5b2fe83548ad31a6f57e6b0a8f3368518914
Author: Yao Li <[email protected]>
AuthorDate: Thu May 21 01:35:26 2026 -0700

    feat: add metrics for bucketassign.minibatch cache hit ratio (#18761)
    
    Co-authored-by: Cursor <[email protected]>
---
 .../hudi/metrics/FlinkIndexBackendMetrics.java     | 35 ++++++++-
 .../index/GlobalRecordLevelIndexBackend.java       |  6 +-
 .../hudi/metrics/TestFlinkIndexBackendMetrics.java | 52 +++++++++++++
 .../index/TestGlobalRecordLevelIndexBackend.java   | 87 ++++++++++++++++++++++
 4 files changed, 177 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
index 65fad78e93a7..4c8fc700c282 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
@@ -20,19 +20,23 @@ package org.apache.hudi.metrics;
 
 import com.codahale.metrics.SlidingWindowReservoir;
 import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.hudi.common.util.VisibleForTesting;
 
 /**
  * Metrics for the {@link 
org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend}.
- * Tracks cache hit/miss counts and the latency of local (cache) vs. remote 
(metadata table) lookups.
+ * Tracks the latency of local (cache) vs. remote (metadata table) lookups, 
the per-lookup key
+ * counts, and the per-mini-batch in-memory cache hit ratio.
  */
 public class FlinkIndexBackendMetrics extends HoodieFlinkMetrics {
   private static final int HISTOGRAM_WINDOW_SIZE = 100;
   private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
   private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
 
+  public static final String LOOKUP_CACHE_HIT_RATIO = "lookupCacheHitRatio";
+
   /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
   private final Histogram localIndexLookupLatency;
 
@@ -45,6 +49,15 @@ public class FlinkIndexBackendMetrics extends 
HoodieFlinkMetrics {
   /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
   private final Histogram remoteLookupKeysNum;
 
+  /**
+   * Latest per-mini-batch in-memory cache hit ratio (hits / (hits + misses)).
+   * Set on each {@link #updateLookupCacheHitRatio(long, long)} call; defaults 
to 0.0
+   * and is left unchanged when a lookup observes zero total keys.
+   * Marked {@code volatile} because Flink's reporter thread reads it while the
+   * bucket-assign task thread writes to it.
+   */
+  private volatile double lookupCacheHitRatio = 0.0D;
+
   public FlinkIndexBackendMetrics(MetricGroup metricGroup) {
     super(metricGroup);
     this.localIndexLookupLatency = new DropwizardHistogramWrapper(
@@ -63,6 +76,21 @@ public class FlinkIndexBackendMetrics extends 
HoodieFlinkMetrics {
     metricGroup.histogram("remoteIndexLookupLatency", 
remoteIndexLookupLatency);
     metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
     metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+    metricGroup.gauge(LOOKUP_CACHE_HIT_RATIO, (Gauge<Double>) () -> 
lookupCacheHitRatio);
+  }
+
+  /**
+   * Updates the per-mini-batch cache hit-ratio gauge from the hit/miss counts 
already
+   * fed into {@link #updateLocalLookupKeysCount(long)} and {@link 
#updateRemoteLookupKeysCount(long)}.
+   * When the lookup observed no keys, the previous value is preserved so 
dashboards
+   * don't oscillate back to zero on idle mini-batches.
+   */
+  public void updateLookupCacheHitRatio(long hitCount, long missCount) {
+    long total = hitCount + missCount;
+    if (total <= 0L) {
+      return;
+    }
+    lookupCacheHitRatio = (double) hitCount / total;
   }
 
   public void startLocalIndexLookup() {
@@ -108,4 +136,9 @@ public class FlinkIndexBackendMetrics extends 
HoodieFlinkMetrics {
   public long getRemoteLookupKeysSampleCount() {
     return remoteLookupKeysNum.getCount();
   }
+
+  @VisibleForTesting
+  public double getLookupCacheHitRatio() {
+    return lookupCacheHitRatio;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 628e20e86b81..1eeef79af9b2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieListData;
@@ -37,6 +36,7 @@ import org.apache.hudi.util.StreamerUtil;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -106,8 +106,10 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
       }
     }
 
+    int hitCount = recordKeys.size() - missedKeys.size();
     metrics.endLocalIndexLookup();
-    metrics.updateLocalLookupKeysCount(recordKeys.size() - missedKeys.size());
+    metrics.updateLocalLookupKeysCount(hitCount);
+    metrics.updateLookupCacheHitRatio(hitCount, missedKeys.size());
 
     if (!missedKeys.isEmpty()) {
       metrics.startRemoteIndexLookup();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
index 3a11348e2431..a41e713a8496 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
@@ -54,6 +55,46 @@ class TestFlinkIndexBackendMetrics {
     assertNotNull(metricGroup.getHistogram("remoteLookupKeysNum"));
   }
 
+  @Test
+  void testRegisterMetricsRegistersHitRatioGauge() {
+    Gauge<?> gauge = 
metricGroup.getGauge(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+    assertNotNull(gauge);
+    assertEquals(0.0D, ((Double) gauge.getValue()).doubleValue());
+  }
+
+  @Test
+  void testUpdateLookupCacheHitRatioTracksLatestMiniBatch() {
+    Gauge<?> gauge = 
metricGroup.getGauge(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+
+    metrics.updateLookupCacheHitRatio(3L, 1L);
+    assertEquals(0.75D, ((Double) gauge.getValue()).doubleValue());
+    assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+    // gauge reflects the latest mini-batch, not a running average.
+    metrics.updateLookupCacheHitRatio(1L, 3L);
+    assertEquals(0.25D, ((Double) gauge.getValue()).doubleValue());
+
+    metrics.updateLookupCacheHitRatio(5L, 0L);
+    assertEquals(1.0D, ((Double) gauge.getValue()).doubleValue());
+
+    metrics.updateLookupCacheHitRatio(0L, 5L);
+    assertEquals(0.0D, ((Double) gauge.getValue()).doubleValue());
+  }
+
+  @Test
+  void testUpdateLookupCacheHitRatioPreservesPreviousValueOnEmptyBatch() {
+    metrics.updateLookupCacheHitRatio(3L, 1L);
+    assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+    // empty mini-batch (no keys looked up) must not reset the ratio to NaN or 
0.
+    metrics.updateLookupCacheHitRatio(0L, 0L);
+    assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+    // negative counts are treated as an empty batch (defensive guard).
+    metrics.updateLookupCacheHitRatio(-1L, -2L);
+    assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+  }
+
   @Test
   void testLocalIndexLookupUpdatesHistogramCount() {
     Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
@@ -189,6 +230,7 @@ class TestFlinkIndexBackendMetrics {
 
   private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
     private final Map<String, Histogram> histograms = new HashMap<>();
+    private final Map<String, Gauge<?>> gauges = new HashMap<>();
 
     @Override
     public <H extends Histogram> H histogram(String name, H histogram) {
@@ -196,8 +238,18 @@ class TestFlinkIndexBackendMetrics {
       return histogram;
     }
 
+    @Override
+    public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+      gauges.put(name, gauge);
+      return gauge;
+    }
+
     Histogram getHistogram(String name) {
       return histograms.get(name);
     }
+
+    Gauge<?> getGauge(String name) {
+      return gauges.get(name);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 3f1a3b28af1b..edfad3481560 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metrics.FlinkIndexBackendMetrics;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -31,6 +32,8 @@ import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,8 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -222,4 +229,84 @@ public class TestGlobalRecordLevelIndexBackend {
       assertEquals(location, 
backend.get(Collections.singletonList("idempotent_key")).get("idempotent_key"));
     }
   }
+
+  @Test
+  void testGetUpdatesCacheHitRatioGauge() throws Exception {
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      Map<String, Gauge<?>> gauges = new HashMap<>();
+      backend.registerMetrics(captureGauges(gauges));
+
+      Gauge<?> hitRatioGauge = 
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+      assertNotNull(hitRatioGauge);
+      // no lookups yet: gauge is at its initial value.
+      assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+      // first lookup for two known + one unknown key: 0 / 3 hit ratio, then 
the two known keys
+      // are populated into the cache by the metadata table fallback.
+      Map<String, HoodieRecordGlobalLocation> firstLookup =
+          backend.get(Arrays.asList("id1", "id2", "missing_key"));
+      assertEquals(2, firstLookup.size());
+      assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+      // second lookup: id1 and id2 are now cached so they should be hits; 
missing_key remains a miss.
+      // ratio = 2 hits / (2 hits + 1 miss).
+      backend.get(Arrays.asList("id1", "id2", "missing_key"));
+      assertEquals(2.0D / 3.0D, ((Double) 
hitRatioGauge.getValue()).doubleValue());
+    }
+  }
+
+  @Test
+  void testRegisterMetricsRegistersHitRatioGauge() throws Exception {
+    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      Map<String, Gauge<?>> gauges = new HashMap<>();
+      backend.registerMetrics(captureGauges(gauges));
+
+      
assertTrue(gauges.containsKey(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO));
+      assertEquals(0.0D, ((Double) 
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO).getValue()).doubleValue());
+    }
+  }
+
+  @Test
+  void testGetUpdatesCacheHitRatioGaugeForCachedLookup() throws Exception {
+    // Cache-only variant of testGetUpdatesCacheHitRatioGauge: pre-populates 
the cache
+    // so the metadata-table fallback is skipped, which keeps the test from 
depending on
+    // any writer/runtime setup while still exercising the get(List) -> gauge 
wire-up.
+    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      Map<String, Gauge<?>> gauges = new HashMap<>();
+      backend.registerMetrics(captureGauges(gauges));
+
+      Gauge<?> hitRatioGauge = 
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+      assertNotNull(hitRatioGauge);
+      // no lookups yet: gauge is at its initial value.
+      assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+      HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
+      HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("par1", "000000001", "file-id-2");
+      backend.update("cached_key_1", location1);
+      backend.update("cached_key_2", location2);
+
+      // all keys served from the in-memory cache: ratio = 2 / 2 = 1.0.
+      Map<String, HoodieRecordGlobalLocation> result =
+          backend.get(Arrays.asList("cached_key_1", "cached_key_2"));
+      assertEquals(2, result.size());
+      assertEquals(1.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+      // a second, single-key cached lookup keeps the ratio at 1.0.
+      backend.get(Collections.singletonList("cached_key_1"));
+      assertEquals(1.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+    }
+  }
+
+  private static MetricGroup captureGauges(Map<String, Gauge<?>> sink) {
+    MetricGroup metricGroup = mock(MetricGroup.class);
+    doAnswer(invocation -> {
+      String name = invocation.getArgument(0);
+      Gauge<?> gauge = invocation.getArgument(1);
+      sink.put(name, gauge);
+      return gauge;
+    }).when(metricGroup).gauge(anyString(), any(Gauge.class));
+    return metricGroup;
+  }
 }

Reply via email to