This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 990fc29f5609 feat(flink): add metrics for RLI load time for bucket 
assign functions (#18762)
990fc29f5609 is described below

commit 990fc29f5609a919bcf6397abbdfe6c7c849262b
Author: Peter Huang <[email protected]>
AuthorDate: Tue May 19 20:41:47 2026 -0700

    feat(flink): add metrics for RLI load time for bucket assign functions 
(#18762)
---
 .../hudi/metrics/FlinkBucketAssignMetrics.java     |  64 +++++++
 .../hudi/metrics/FlinkIndexBackendMetrics.java     | 111 +++++++++++
 .../sink/partitioner/BucketAssignFunction.java     |   6 +
 .../partitioner/MinibatchBucketAssignFunction.java |  13 ++
 .../index/GlobalRecordLevelIndexBackend.java       |  23 ++-
 .../partitioner/index/IndexBackendFactory.java     |   5 +-
 .../hudi/metrics/TestFlinkBucketAssignMetrics.java | 133 ++++++++++++++
 .../hudi/metrics/TestFlinkIndexBackendMetrics.java | 203 +++++++++++++++++++++
 .../TestMinibatchBucketAssignFunction.java         | 128 ++++++++++++-
 .../index/TestGlobalRecordLevelIndexBackend.java   |  32 ++++
 10 files changed, 714 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
new file mode 100644
index 000000000000..fafaeaecc9ac
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks record buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+  private static final int HISTOGRAM_WINDOW_SIZE = 100;
+  private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+  /**
+   * Time records spend buffered before being processed, in milliseconds.
+   * Only populated by MinibatchBucketAssignFunction.
+   */
+  private final Histogram recordBufferingTime;
+
+  public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.recordBufferingTime = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+  }
+
+  public void startRecordBuffering() {
+    startTimer(RECORD_BUFFERING_KEY);
+  }
+
+  public void endRecordBuffering() {
+    recordBufferingTime.update(stopTimer(RECORD_BUFFERING_KEY));
+  }
+
+  @VisibleForTesting
+  public long getRecordBufferingCount() {
+    return recordBufferingTime.getCount();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
new file mode 100644
index 000000000000..65fad78e93a7
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for the {@link 
org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend}.
+ * Tracks cache hit/miss counts and the latency of local (cache) vs. remote 
(metadata table) lookups.
+ */
+public class FlinkIndexBackendMetrics extends HoodieFlinkMetrics {
+  private static final int HISTOGRAM_WINDOW_SIZE = 100;
+  private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+  private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+
+  /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
+  private final Histogram localIndexLookupLatency;
+
+  /** Latency of the remote (metadata table) phase of each index lookup, in 
milliseconds. */
+  private final Histogram remoteIndexLookupLatency;
+
+  /** Number of keys resolved from the local cache per lookup. */
+  private final Histogram localLookupKeysNum;
+
+  /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
+  private final Histogram remoteLookupKeysNum;
+
+  public FlinkIndexBackendMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+    this.remoteIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+    this.localLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+    this.remoteLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.histogram("localIndexLookupLatency", localIndexLookupLatency);
+    metricGroup.histogram("remoteIndexLookupLatency", 
remoteIndexLookupLatency);
+    metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
+    metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+  }
+
+  public void startLocalIndexLookup() {
+    startTimer(LOCAL_INDEX_LOOKUP_KEY);
+  }
+
+  public void endLocalIndexLookup() {
+    localIndexLookupLatency.update(stopTimer(LOCAL_INDEX_LOOKUP_KEY));
+  }
+
+  public void updateLocalLookupKeysCount(long n) {
+    localLookupKeysNum.update(n);
+  }
+
+  public void startRemoteIndexLookup() {
+    startTimer(REMOTE_INDEX_LOOKUP_KEY);
+  }
+
+  public void endRemoteIndexLookup() {
+    remoteIndexLookupLatency.update(stopTimer(REMOTE_INDEX_LOOKUP_KEY));
+  }
+
+  public void updateRemoteLookupKeysCount(long n) {
+    remoteLookupKeysNum.update(n);
+  }
+
+  @VisibleForTesting
+  public long getLocalIndexLookupCount() {
+    return localIndexLookupLatency.getCount();
+  }
+
+  @VisibleForTesting
+  public long getRemoteIndexLookupCount() {
+    return remoteIndexLookupLatency.getCount();
+  }
+
+  @VisibleForTesting
+  public long getLocalLookupKeysSampleCount() {
+    return localLookupKeysNum.getCount();
+  }
+
+  @VisibleForTesting
+  public long getRemoteLookupKeysSampleCount() {
+    return remoteLookupKeysNum.getCount();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7faff523e689..6caacbbc3843 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.partitioner.index.GlobalIndexBackend;
 import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
@@ -87,6 +88,9 @@ public class BucketAssignFunction
   @Getter
   private transient GlobalIndexBackend indexBackend;
 
+  @Getter
+  protected transient FlinkBucketAssignMetrics metrics;
+
   /**
    * Bucket assigner to assign new bucket IDs or reuse existing ones.
    */
@@ -139,6 +143,8 @@ public class BucketAssignFunction
         context,
         writeConfig);
     this.recordProcessor = initRecordProcessor();
+    this.metrics = new 
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
+    this.metrics.registerMetrics();
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index cdf30c932eb7..c29d538d4e6f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.partitioner.index.MinibatchIndexBackend;
 
@@ -120,6 +121,10 @@ public class MinibatchBucketAssignFunction
       // handle index records immediately, do not need buffering
       delegateFunction.processIndexRecord(record, record.getRecordKey());
     } else {
+      // Start buffering timer when first record enters an empty buffer
+      if (recordBuffer.isEmpty()) {
+        delegateFunction.getMetrics().startRecordBuffering();
+      }
       // Add data records to the buffer
       recordBuffer.add(record);
       // Process the buffer if it reaches the configured size
@@ -136,6 +141,9 @@ public class MinibatchBucketAssignFunction
     if (recordBuffer.isEmpty()) {
       return;
     }
+
+    // Record how long the oldest record in the batch was buffered
+    delegateFunction.getMetrics().endRecordBuffering();
     // process batch of records.
     minibatchProcessor.process(recordBuffer, out);
     // Clear the buffer after processing
@@ -199,6 +207,11 @@ public class MinibatchBucketAssignFunction
     this.delegateFunction.setCorrespondent(correspondent);
   }
 
+  @VisibleForTesting
+  public FlinkBucketAssignMetrics getDelegateMetrics() {
+    return delegateFunction.getMetrics();
+  }
+
   @Override
   public void notifyCheckpointComplete(long checkpointId) throws Exception {
     // Refresh the table state when there are new commits.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index a95e18e863a1..628e20e86b81 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
@@ -28,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metrics.FlinkIndexBackendMetrics;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -56,6 +59,7 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
   private final Configuration conf;
   private final HoodieTableMetaClient metaClient;
   private HoodieTableMetadata metadataTable;
+  private FlinkIndexBackendMetrics metrics;
 
   /**
    * Creates a global RLI backend with a checkpoint-aware cache.
@@ -67,9 +71,16 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
     this.metaClient = StreamerUtil.createMetaClient(conf);
     this.conf = conf;
     this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+    registerMetrics(new UnregisteredMetricsGroup());
     reloadMetadataTable();
   }
 
+  @Override
+  public void registerMetrics(MetricGroup metricGroup) {
+    this.metrics = new FlinkIndexBackendMetrics(metricGroup);
+    this.metrics.registerMetrics();
+  }
+
   @Override
   public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
     throw new UnsupportedOperationException(this.getClass().getSimpleName() + 
" doesn't support lookup with a single key.");
@@ -84,7 +95,9 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
   public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
     Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
     List<String> missedKeys = new ArrayList<>();
-    for (String key: recordKeys) {
+
+    metrics.startLocalIndexLookup();
+    for (String key : recordKeys) {
       HoodieRecordGlobalLocation location = recordIndexCache.get(key);
       if (location == null) {
         missedKeys.add(key);
@@ -92,13 +105,21 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
         keysAndLocations.put(key, location);
       }
     }
+
+    metrics.endLocalIndexLookup();
+    metrics.updateLocalLookupKeysCount(recordKeys.size() - missedKeys.size());
+
     if (!missedKeys.isEmpty()) {
+      metrics.startRemoteIndexLookup();
       HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
           
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
       recordIndexData.forEach(keyAndLocation -> {
         recordIndexCache.update(keyAndLocation.getKey(), 
keyAndLocation.getValue());
         keysAndLocations.put(keyAndLocation.getKey(), 
keyAndLocation.getValue());
       });
+
+      metrics.endRemoteIndexLookup();
+      metrics.updateRemoteLookupKeysCount(missedKeys.size());
     }
     return keysAndLocations;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index 01a2e04224a5..9c61dbe0acb6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -53,7 +53,10 @@ public class IndexBackendFactory {
    * @param runtimeContext Flink runtime context for job and attempt metadata
    * @return global index backend for record-key lookups
    */
-  public static GlobalIndexBackend create(Configuration conf, 
FunctionInitializationContext context, RuntimeContext runtimeContext) throws 
Exception {
+  public static GlobalIndexBackend create(
+          Configuration conf,
+          FunctionInitializationContext context,
+          RuntimeContext runtimeContext) throws Exception {
     HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
     switch (indexType) {
       case FLINK_STATE:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
new file mode 100644
index 000000000000..e4ff46e31474
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkBucketAssignMetrics}.
+ */
+class TestFlinkBucketAssignMetrics {
+
+  private CapturingMetricGroup metricGroup;
+  private FlinkBucketAssignMetrics metrics;
+
+  @BeforeEach
+  void setUp() {
+    metricGroup = new CapturingMetricGroup();
+    metrics = new FlinkBucketAssignMetrics(metricGroup);
+    metrics.registerMetrics();
+  }
+
+  @Test
+  void testRegisterMetricsRegistersHistograms() {
+    assertNotNull(metricGroup.getHistogram("recordBufferingTime"));
+  }
+
+  @Test
+  void testRecordBufferingUpdatesHistogramCount() {
+    Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+    assertEquals(0, hist.getCount());
+
+    metrics.startRecordBuffering();
+    metrics.endRecordBuffering();
+    assertEquals(1, hist.getCount());
+  }
+
+  @Test
+  void testRecordBufferingTimeIsNonNegative() {
+    metrics.startRecordBuffering();
+    metrics.endRecordBuffering();
+
+    Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+    assertTrue(hist.getStatistics().getMin() >= 0);
+  }
+
+  @Test
+  void testEndRecordBufferingWithoutStartRecordsZero() {
+    metrics.endRecordBuffering();
+
+    Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+    assertEquals(1, hist.getCount());
+    assertEquals(0, hist.getStatistics().getMax());
+  }
+
+  @Test
+  void testVisibleForTestingGetters() {
+    assertEquals(0, metrics.getRecordBufferingCount());
+
+    metrics.startRecordBuffering();
+    metrics.endRecordBuffering();
+    assertEquals(1, metrics.getRecordBufferingCount());
+  }
+
+  @Test
+  void testMultipleConsecutiveBufferingCycles() {
+    for (int i = 0; i < 5; i++) {
+      metrics.startRecordBuffering();
+      metrics.endRecordBuffering();
+    }
+    assertEquals(5, metrics.getRecordBufferingCount());
+  }
+
+  @Test
+  void testTimerRestartBeforeStop() {
+    // Calling startRecordBuffering twice before stopping should override the 
first start.
+    // The end call still records exactly one sample.
+    metrics.startRecordBuffering();
+    metrics.startRecordBuffering();
+    metrics.endRecordBuffering();
+    assertEquals(1, metrics.getRecordBufferingCount());
+  }
+
+  @Test
+  void testSlidingWindowCapAtHundred() {
+    // SlidingWindowReservoir(100) retains only the 100 most recent samples.
+    for (int i = 0; i < 110; i++) {
+      metrics.startRecordBuffering();
+      metrics.endRecordBuffering();
+    }
+    assertEquals(110, metrics.getRecordBufferingCount());
+  }
+
+  private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+    private final Map<String, Histogram> histograms = new HashMap<>();
+
+    @Override
+    public <H extends Histogram> H histogram(String name, H histogram) {
+      histograms.put(name, histogram);
+      return histogram;
+    }
+
+    Histogram getHistogram(String name) {
+      return histograms.get(name);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
new file mode 100644
index 000000000000..3a11348e2431
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkIndexBackendMetrics}.
+ */
+class TestFlinkIndexBackendMetrics {
+
+  private CapturingMetricGroup metricGroup;
+  private FlinkIndexBackendMetrics metrics;
+
+  @BeforeEach
+  void setUp() {
+    metricGroup = new CapturingMetricGroup();
+    metrics = new FlinkIndexBackendMetrics(metricGroup);
+    metrics.registerMetrics();
+  }
+
+  @Test
+  void testRegisterMetricsRegistersHistograms() {
+    assertNotNull(metricGroup.getHistogram("localIndexLookupLatency"));
+    assertNotNull(metricGroup.getHistogram("remoteIndexLookupLatency"));
+    assertNotNull(metricGroup.getHistogram("localLookupKeysNum"));
+    assertNotNull(metricGroup.getHistogram("remoteLookupKeysNum"));
+  }
+
+  @Test
+  void testLocalIndexLookupUpdatesHistogramCount() {
+    Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+    assertEquals(0, hist.getCount());
+
+    metrics.startLocalIndexLookup();
+    metrics.endLocalIndexLookup();
+    assertEquals(1, hist.getCount());
+
+    metrics.startLocalIndexLookup();
+    metrics.endLocalIndexLookup();
+    assertEquals(2, hist.getCount());
+  }
+
+  @Test
+  void testRemoteIndexLookupUpdatesHistogramCount() {
+    Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+    assertEquals(0, hist.getCount());
+
+    metrics.startRemoteIndexLookup();
+    metrics.endRemoteIndexLookup();
+    assertEquals(1, hist.getCount());
+  }
+
+  @Test
+  void testLocalLookupKeysNumUpdatesHistogram() {
+    Histogram hist = metricGroup.getHistogram("localLookupKeysNum");
+    assertEquals(0, hist.getCount());
+
+    metrics.updateLocalLookupKeysCount(5);
+    assertEquals(1, hist.getCount());
+    assertEquals(5, hist.getStatistics().getMax());
+
+    metrics.updateLocalLookupKeysCount(3);
+    assertEquals(2, hist.getCount());
+  }
+
+  @Test
+  void testRemoteLookupKeysNumUpdatesHistogram() {
+    Histogram hist = metricGroup.getHistogram("remoteLookupKeysNum");
+    assertEquals(0, hist.getCount());
+
+    metrics.updateRemoteLookupKeysCount(10);
+    assertEquals(1, hist.getCount());
+    assertEquals(10, hist.getStatistics().getMax());
+  }
+
+  @Test
+  void testLocalIndexLookupLatencyIsNonNegative() {
+    metrics.startLocalIndexLookup();
+    metrics.endLocalIndexLookup();
+
+    Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+    assertTrue(hist.getStatistics().getMin() >= 0);
+  }
+
+  @Test
+  void testRemoteIndexLookupLatencyIsNonNegative() {
+    metrics.startRemoteIndexLookup();
+    metrics.endRemoteIndexLookup();
+
+    Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+    assertTrue(hist.getStatistics().getMin() >= 0);
+  }
+
+  @Test
+  void testEndLocalIndexLookupWithoutStartRecordsZero() {
+    metrics.endLocalIndexLookup();
+
+    Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+    assertEquals(1, hist.getCount());
+    assertEquals(0, hist.getStatistics().getMax());
+  }
+
+  @Test
+  void testEndRemoteIndexLookupWithoutStartRecordsZero() {
+    metrics.endRemoteIndexLookup();
+
+    Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+    assertEquals(1, hist.getCount());
+    assertEquals(0, hist.getStatistics().getMax());
+  }
+
+  @Test
+  void testVisibleForTestingGetters() {
+    assertEquals(0, metrics.getLocalIndexLookupCount());
+    assertEquals(0, metrics.getRemoteIndexLookupCount());
+    assertEquals(0, metrics.getLocalLookupKeysSampleCount());
+    assertEquals(0, metrics.getRemoteLookupKeysSampleCount());
+
+    metrics.startLocalIndexLookup();
+    metrics.endLocalIndexLookup();
+    assertEquals(1, metrics.getLocalIndexLookupCount());
+
+    metrics.startRemoteIndexLookup();
+    metrics.endRemoteIndexLookup();
+    assertEquals(1, metrics.getRemoteIndexLookupCount());
+
+    metrics.updateLocalLookupKeysCount(4);
+    assertEquals(1, metrics.getLocalLookupKeysSampleCount());
+
+    metrics.updateRemoteLookupKeysCount(2);
+    assertEquals(1, metrics.getRemoteLookupKeysSampleCount());
+  }
+
+  @Test
+  void testCombinedLocalAndRemoteLookupInOneRound() {
+    metrics.startLocalIndexLookup();
+    metrics.endLocalIndexLookup();
+    metrics.updateLocalLookupKeysCount(3);
+    metrics.startRemoteIndexLookup();
+    metrics.endRemoteIndexLookup();
+    metrics.updateRemoteLookupKeysCount(2);
+
+    assertEquals(1, metrics.getLocalIndexLookupCount());
+    assertEquals(1, metrics.getRemoteIndexLookupCount());
+    assertEquals(1, metrics.getLocalLookupKeysSampleCount());
+    assertEquals(1, metrics.getRemoteLookupKeysSampleCount());
+  }
+
+  @Test
+  void testUpdateLookupKeyCountsWithZero() {
+    metrics.updateLocalLookupKeysCount(0);
+    metrics.updateRemoteLookupKeysCount(0);
+
+    Histogram localHist = metricGroup.getHistogram("localLookupKeysNum");
+    Histogram remoteHist = metricGroup.getHistogram("remoteLookupKeysNum");
+    assertEquals(1, localHist.getCount());
+    assertEquals(0, localHist.getStatistics().getMax());
+    assertEquals(1, remoteHist.getCount());
+    assertEquals(0, remoteHist.getStatistics().getMax());
+  }
+
+  private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+    private final Map<String, Histogram> histograms = new HashMap<>();
+
+    @Override
+    public <H extends Histogram> H histogram(String name, H histogram) {
+      histograms.put(name, histogram);
+      return histogram;
+    }
+
+    Histogram getHistogram(String name) {
+      return histograms.get(name);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index a98a95529fc1..0676aa89222d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -20,8 +20,10 @@ package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -41,6 +43,7 @@ import java.util.List;
 
 import static org.apache.hudi.utils.TestData.insertRow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -48,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class TestMinibatchBucketAssignFunction {
   private OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> testHarness;
+  private MinibatchBucketAssignFunction function;
   private static Configuration conf;
 
   @TempDir
@@ -65,7 +69,7 @@ public class TestMinibatchBucketAssignFunction {
   @BeforeEach
   public void beforeEach() throws Exception {
     // Create the MinibatchBucketAssignFunction
-    MinibatchBucketAssignFunction function = new 
MinibatchBucketAssignFunction(conf);
+    function = new MinibatchBucketAssignFunction(conf);
     // Set up test harness
     testHarness = new OneInputStreamOperatorTestHarness<>(new 
MiniBatchBucketAssignOperator(function, new OperatorID()), 1, 1, 0);
     testHarness.open();
@@ -240,8 +244,128 @@ public class TestMinibatchBucketAssignFunction {
     HoodieFlinkInternalRow record = new HoodieFlinkInternalRow("id1", "par1", 
"I",
         insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), 
StringData.fromString("par1")));
     testHarness.processElement(new StreamRecord<>(record));
-    
+
     // Close should not throw any exceptions
     testHarness.close();
   }
+
+  @Test
+  public void testInsertOperationFlushedByEndInput() throws Exception {
+    // With OPERATION=INSERT, endInput() should flush the partial buffer.
+    Configuration insertConf = Configuration.fromMap(conf.toMap());
+    insertConf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+
+    MinibatchBucketAssignFunction insertFunction = new 
MinibatchBucketAssignFunction(insertConf);
+    OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> insertHarness =
+        new OneInputStreamOperatorTestHarness<>(new 
MiniBatchBucketAssignOperator(insertFunction, new OperatorID()), 1, 1, 0);
+    insertHarness.open();
+    try {
+      for (int i = 0; i < 3; i++) {
+        String key = "ins_key_" + i;
+        HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(key, 
"par9", "I",
+            insertRow(StringData.fromString(key), 
StringData.fromString("Name"), 30,
+                TimestampData.fromEpochMillis(1), 
StringData.fromString("par9")));
+        insertHarness.processElement(new StreamRecord<>(record));
+      }
+
+      // Buffer holds 3 records, no flush yet.
+      assertEquals(0, insertHarness.extractOutputValues().size(), "Records 
should still be buffered");
+
+      insertHarness.endInput();
+
+      List<HoodieFlinkInternalRow> output = 
insertHarness.extractOutputValues();
+      assertEquals(3, output.size(), "endInput should flush all buffered 
records");
+      for (HoodieFlinkInternalRow row : output) {
+        assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(), 
"File ID should be assigned");
+        assertEquals("I", row.getInstantTime(), "All records should be INSERT 
bucket assignments");
+      }
+    } finally {
+      insertHarness.close();
+    }
+  }
+
+  @Test
+  public void testEmptyBufferPrepareSnapshotPreBarrierEmitsNothing() throws 
Exception {
+    // prepareSnapshotPreBarrier on an empty buffer should not throw and 
should emit no records.
+    testHarness.prepareSnapshotPreBarrier(1L);
+
+    List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
+    assertEquals(0, output.size(), "Empty buffer should produce no output 
during snapshot");
+  }
+
+  @Test
+  public void testMultipleCheckpointFlushes() throws Exception {
+    // Each prepareSnapshotPreBarrier call should flush only the records 
buffered since the last flush.
+    HoodieFlinkInternalRow record1 = new HoodieFlinkInternalRow("id1", "par1", 
"I",
+        insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1", 
"I",
+        insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+
+    // First checkpoint cycle.
+    testHarness.processElement(new StreamRecord<>(record1));
+    testHarness.prepareSnapshotPreBarrier(1L);
+    assertEquals(1, testHarness.extractOutputValues().size(), "First 
checkpoint should flush 1 record");
+
+    // Second checkpoint cycle.
+    testHarness.processElement(new StreamRecord<>(record2));
+    testHarness.prepareSnapshotPreBarrier(2L);
+
+    List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
+    assertEquals(2, output.size(), "Second checkpoint should have flushed 1 
more record (2 cumulative)");
+    for (HoodieFlinkInternalRow row : output) {
+      assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(), "File 
ID should be assigned");
+      assertEquals("U", row.getInstantTime(), "Previously written records 
should be updates");
+    }
+  }
+
+  @Test
+  public void testCustomMinibatchSizeAboveDefault() {
+    // When the configured size exceeds the default minimum, it should be used 
as-is.
+    Configuration customConf = Configuration.fromMap(conf.toMap());
+    customConf.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 2000);
+    MinibatchBucketAssignFunction customFunction = new 
MinibatchBucketAssignFunction(customConf);
+    assertEquals(2000, customFunction.getMiniBatchSize(),
+        "Should use the configured size when it is above the default minimum");
+  }
+
+  @Test
+  public void testEndInputOnEmptyBufferDoesNotThrow() throws Exception {
+    // endInput with an empty buffer must not throw and should emit no records.
+    testHarness.endInput();
+    assertEquals(0, testHarness.extractOutputValues().size(), "No output 
expected when buffer was already empty");
+  }
+
+  @Test
+  public void testDelegateMetricsNonNullAfterOpen() {
+    FlinkBucketAssignMetrics delegateMetrics = function.getDelegateMetrics();
+    assertNotNull(delegateMetrics, "Delegate metrics should be initialized 
after open");
+  }
+
+  @Test
+  public void testBufferingMetricIncrementedAfterFullBatchFlush() throws 
Exception {
+    // Processing exactly miniBatchSize records triggers one automatic flush, 
recording one buffering sample.
+    for (int i = 0; i < 
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue(); i++) {
+      String key = "batch_key_" + i;
+      HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(key, "par5", 
"I",
+          insertRow(StringData.fromString(key), StringData.fromString("Name"), 
25,
+              TimestampData.fromEpochMillis(1), 
StringData.fromString("par5")));
+      testHarness.processElement(new StreamRecord<>(record));
+    }
+    assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
+        "One buffering cycle should be recorded after a full batch flush");
+  }
+
+  @Test
+  public void testBufferingMetricIncrementedAfterCheckpointFlush() throws 
Exception {
+    // A partial buffer flushed by a checkpoint barrier records one buffering 
sample.
+    HoodieFlinkInternalRow record = new HoodieFlinkInternalRow("id1", "par1", 
"I",
+        insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    testHarness.processElement(new StreamRecord<>(record));
+    testHarness.prepareSnapshotPreBarrier(1L);
+    assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
+        "One buffering cycle should be recorded after a checkpoint flush");
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 3b3f72a9fcb5..3f1a3b28af1b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -56,6 +59,7 @@ import static org.mockito.Mockito.when;
 public class TestGlobalRecordLevelIndexBackend {
 
   private Configuration conf;
+  private NoOpMetricRegistry registry = new NoOpMetricRegistry();
 
   @TempDir
   File tempFile;
@@ -75,6 +79,7 @@ public class TestGlobalRecordLevelIndexBackend {
     String firstCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
 
     try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      
globalRecordLevelIndexBackend.registerMetrics(TaskManagerMetricGroup.createTaskManagerMetricGroup(registry,
 "localhost", ResourceID.generate()));
       // get record location
       HoodieRecordGlobalLocation location = 
globalRecordLevelIndexBackend.get(Collections.singletonList("id1")).get("id1");
       assertNotNull(location);
@@ -190,4 +195,31 @@ public class TestGlobalRecordLevelIndexBackend {
       assertEquals("par1", 
globalRecordLevelIndexBackend.get(Collections.singletonList("id4_0")).get("id4_0").getPartitionPath());
     }
   }
+
+  @Test
+  void testLookupWithoutMetricsRegistrationIsNullSafe() throws Exception {
+    // Verifies that the if (metrics != null) guards in get(List) don't throw 
even when
+    // registerMetrics was never called.
+    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
+      backend.update("null_metrics_key", location);
+      Map<String, HoodieRecordGlobalLocation> result = 
backend.get(Collections.singletonList("null_metrics_key"));
+      assertEquals(location, result.get("null_metrics_key"));
+    }
+  }
+
+  @Test
+  void testRegisterMetricsIsIdempotent() throws Exception {
+    // The second registerMetrics call must be a no-op and must not throw.
+    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      TaskManagerMetricGroup group = 
TaskManagerMetricGroup.createTaskManagerMetricGroup(
+          registry, "localhost", ResourceID.generate());
+      backend.registerMetrics(group);
+      backend.registerMetrics(group);
+
+      HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("par2", "000000002", "file-id-2");
+      backend.update("idempotent_key", location);
+      assertEquals(location, 
backend.get(Collections.singletonList("idempotent_key")).get("idempotent_key"));
+    }
+  }
 }


Reply via email to