This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 990fc29f5609 feat(flink): add metrics for RLI load time for bucket
assign functions (#18762)
990fc29f5609 is described below
commit 990fc29f5609a919bcf6397abbdfe6c7c849262b
Author: Peter Huang <[email protected]>
AuthorDate: Tue May 19 20:41:47 2026 -0700
feat(flink): add metrics for RLI load time for bucket assign functions
(#18762)
---
.../hudi/metrics/FlinkBucketAssignMetrics.java | 64 +++++++
.../hudi/metrics/FlinkIndexBackendMetrics.java | 111 +++++++++++
.../sink/partitioner/BucketAssignFunction.java | 6 +
.../partitioner/MinibatchBucketAssignFunction.java | 13 ++
.../index/GlobalRecordLevelIndexBackend.java | 23 ++-
.../partitioner/index/IndexBackendFactory.java | 5 +-
.../hudi/metrics/TestFlinkBucketAssignMetrics.java | 133 ++++++++++++++
.../hudi/metrics/TestFlinkIndexBackendMetrics.java | 203 +++++++++++++++++++++
.../TestMinibatchBucketAssignFunction.java | 128 ++++++++++++-
.../index/TestGlobalRecordLevelIndexBackend.java | 32 ++++
10 files changed, 714 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
new file mode 100644
index 000000000000..fafaeaecc9ac
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction,
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks record buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+ private static final int HISTOGRAM_WINDOW_SIZE = 100;
+ private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+ /**
+ * Time records spend buffered before being processed, in milliseconds.
+ * Only populated by MinibatchBucketAssignFunction.
+ */
+ private final Histogram recordBufferingTime;
+
+ public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+ super(metricGroup);
+ this.recordBufferingTime = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+ }
+
+ @Override
+ public void registerMetrics() {
+ metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+ }
+
+ public void startRecordBuffering() {
+ startTimer(RECORD_BUFFERING_KEY);
+ }
+
+ public void endRecordBuffering() {
+ recordBufferingTime.update(stopTimer(RECORD_BUFFERING_KEY));
+ }
+
+ @VisibleForTesting
+ public long getRecordBufferingCount() {
+ return recordBufferingTime.getCount();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
new file mode 100644
index 000000000000..65fad78e93a7
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for the {@link
org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend}.
+ * Tracks cache hit/miss counts and the latency of local (cache) vs. remote
(metadata table) lookups.
+ */
+public class FlinkIndexBackendMetrics extends HoodieFlinkMetrics {
+ private static final int HISTOGRAM_WINDOW_SIZE = 100;
+ private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+ private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+
+ /** Latency of the local (cache) phase of each index lookup, in
milliseconds. */
+ private final Histogram localIndexLookupLatency;
+
+ /** Latency of the remote (metadata table) phase of each index lookup, in
milliseconds. */
+ private final Histogram remoteIndexLookupLatency;
+
+ /** Number of keys resolved from the local cache per lookup. */
+ private final Histogram localLookupKeysNum;
+
+ /** Number of keys that missed the local cache and were fetched remotely per
lookup. */
+ private final Histogram remoteLookupKeysNum;
+
+ public FlinkIndexBackendMetrics(MetricGroup metricGroup) {
+ super(metricGroup);
+ this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+ this.remoteIndexLookupLatency = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+ this.localLookupKeysNum = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+ this.remoteLookupKeysNum = new DropwizardHistogramWrapper(
+ new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_WINDOW_SIZE)));
+ }
+
+ @Override
+ public void registerMetrics() {
+ metricGroup.histogram("localIndexLookupLatency", localIndexLookupLatency);
+ metricGroup.histogram("remoteIndexLookupLatency",
remoteIndexLookupLatency);
+ metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
+ metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+ }
+
+ public void startLocalIndexLookup() {
+ startTimer(LOCAL_INDEX_LOOKUP_KEY);
+ }
+
+ public void endLocalIndexLookup() {
+ localIndexLookupLatency.update(stopTimer(LOCAL_INDEX_LOOKUP_KEY));
+ }
+
+ public void updateLocalLookupKeysCount(long n) {
+ localLookupKeysNum.update(n);
+ }
+
+ public void startRemoteIndexLookup() {
+ startTimer(REMOTE_INDEX_LOOKUP_KEY);
+ }
+
+ public void endRemoteIndexLookup() {
+ remoteIndexLookupLatency.update(stopTimer(REMOTE_INDEX_LOOKUP_KEY));
+ }
+
+ public void updateRemoteLookupKeysCount(long n) {
+ remoteLookupKeysNum.update(n);
+ }
+
+ @VisibleForTesting
+ public long getLocalIndexLookupCount() {
+ return localIndexLookupLatency.getCount();
+ }
+
+ @VisibleForTesting
+ public long getRemoteIndexLookupCount() {
+ return remoteIndexLookupLatency.getCount();
+ }
+
+ @VisibleForTesting
+ public long getLocalLookupKeysSampleCount() {
+ return localLookupKeysNum.getCount();
+ }
+
+ @VisibleForTesting
+ public long getRemoteLookupKeysSampleCount() {
+ return remoteLookupKeysNum.getCount();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7faff523e689..6caacbbc3843 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.partitioner.index.GlobalIndexBackend;
import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
@@ -87,6 +88,9 @@ public class BucketAssignFunction
@Getter
private transient GlobalIndexBackend indexBackend;
+ @Getter
+ protected transient FlinkBucketAssignMetrics metrics;
+
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
*/
@@ -139,6 +143,8 @@ public class BucketAssignFunction
context,
writeConfig);
this.recordProcessor = initRecordProcessor();
+ this.metrics = new
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
+ this.metrics.registerMetrics();
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index cdf30c932eb7..c29d538d4e6f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.partitioner.index.MinibatchIndexBackend;
@@ -120,6 +121,10 @@ public class MinibatchBucketAssignFunction
// handle index records immediately, do not need buffering
delegateFunction.processIndexRecord(record, record.getRecordKey());
} else {
+ // Start buffering timer when first record enters an empty buffer
+ if (recordBuffer.isEmpty()) {
+ delegateFunction.getMetrics().startRecordBuffering();
+ }
// Add data records to the buffer
recordBuffer.add(record);
// Process the buffer if it reaches the configured size
@@ -136,6 +141,9 @@ public class MinibatchBucketAssignFunction
if (recordBuffer.isEmpty()) {
return;
}
+
+ // Record how long the oldest record in the batch was buffered
+ delegateFunction.getMetrics().endRecordBuffering();
// process batch of records.
minibatchProcessor.process(recordBuffer, out);
// Clear the buffer after processing
@@ -199,6 +207,11 @@ public class MinibatchBucketAssignFunction
this.delegateFunction.setCorrespondent(correspondent);
}
+ @VisibleForTesting
+ public FlinkBucketAssignMetrics getDelegateMetrics() {
+ return delegateFunction.getMetrics();
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// Refresh the table state when there are new commits.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index a95e18e863a1..628e20e86b81 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,6 +18,8 @@
package org.apache.hudi.sink.partitioner.index;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
@@ -28,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metrics.FlinkIndexBackendMetrics;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
@@ -56,6 +59,7 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
private final Configuration conf;
private final HoodieTableMetaClient metaClient;
private HoodieTableMetadata metadataTable;
+ private FlinkIndexBackendMetrics metrics;
/**
* Creates a global RLI backend with a checkpoint-aware cache.
@@ -67,9 +71,16 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
this.metaClient = StreamerUtil.createMetaClient(conf);
this.conf = conf;
this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+ registerMetrics(new UnregisteredMetricsGroup());
reloadMetadataTable();
}
+ @Override
+ public void registerMetrics(MetricGroup metricGroup) {
+ this.metrics = new FlinkIndexBackendMetrics(metricGroup);
+ this.metrics.registerMetrics();
+ }
+
@Override
public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
throw new UnsupportedOperationException(this.getClass().getSimpleName() +
" doesn't support lookup with a single key.");
@@ -84,7 +95,9 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys)
throws IOException {
Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
List<String> missedKeys = new ArrayList<>();
- for (String key: recordKeys) {
+
+ metrics.startLocalIndexLookup();
+ for (String key : recordKeys) {
HoodieRecordGlobalLocation location = recordIndexCache.get(key);
if (location == null) {
missedKeys.add(key);
@@ -92,13 +105,21 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
keysAndLocations.put(key, location);
}
}
+
+ metrics.endLocalIndexLookup();
+ metrics.updateLocalLookupKeysCount(recordKeys.size() - missedKeys.size());
+
if (!missedKeys.isEmpty()) {
+ metrics.startRemoteIndexLookup();
HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
recordIndexData.forEach(keyAndLocation -> {
recordIndexCache.update(keyAndLocation.getKey(),
keyAndLocation.getValue());
keysAndLocations.put(keyAndLocation.getKey(),
keyAndLocation.getValue());
});
+
+ metrics.endRemoteIndexLookup();
+ metrics.updateRemoteLookupKeysCount(missedKeys.size());
}
return keysAndLocations;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index 01a2e04224a5..9c61dbe0acb6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -53,7 +53,10 @@ public class IndexBackendFactory {
* @param runtimeContext Flink runtime context for job and attempt metadata
* @return global index backend for record-key lookups
*/
- public static GlobalIndexBackend create(Configuration conf,
FunctionInitializationContext context, RuntimeContext runtimeContext) throws
Exception {
+ public static GlobalIndexBackend create(
+ Configuration conf,
+ FunctionInitializationContext context,
+ RuntimeContext runtimeContext) throws Exception {
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
switch (indexType) {
case FLINK_STATE:
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
new file mode 100644
index 000000000000..e4ff46e31474
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkBucketAssignMetrics}.
+ */
+class TestFlinkBucketAssignMetrics {
+
+ private CapturingMetricGroup metricGroup;
+ private FlinkBucketAssignMetrics metrics;
+
+ @BeforeEach
+ void setUp() {
+ metricGroup = new CapturingMetricGroup();
+ metrics = new FlinkBucketAssignMetrics(metricGroup);
+ metrics.registerMetrics();
+ }
+
+ @Test
+ void testRegisterMetricsRegistersHistograms() {
+ assertNotNull(metricGroup.getHistogram("recordBufferingTime"));
+ }
+
+ @Test
+ void testRecordBufferingUpdatesHistogramCount() {
+ Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+ assertEquals(0, hist.getCount());
+
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+ assertEquals(1, hist.getCount());
+ }
+
+ @Test
+ void testRecordBufferingTimeIsNonNegative() {
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+
+ Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+ assertTrue(hist.getStatistics().getMin() >= 0);
+ }
+
+ @Test
+ void testEndRecordBufferingWithoutStartRecordsZero() {
+ metrics.endRecordBuffering();
+
+ Histogram hist = metricGroup.getHistogram("recordBufferingTime");
+ assertEquals(1, hist.getCount());
+ assertEquals(0, hist.getStatistics().getMax());
+ }
+
+ @Test
+ void testVisibleForTestingGetters() {
+ assertEquals(0, metrics.getRecordBufferingCount());
+
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+ assertEquals(1, metrics.getRecordBufferingCount());
+ }
+
+ @Test
+ void testMultipleConsecutiveBufferingCycles() {
+ for (int i = 0; i < 5; i++) {
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+ }
+ assertEquals(5, metrics.getRecordBufferingCount());
+ }
+
+ @Test
+ void testTimerRestartBeforeStop() {
+ // Calling startRecordBuffering twice before stopping should override the
first start.
+ // The end call still records exactly one sample.
+ metrics.startRecordBuffering();
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+ assertEquals(1, metrics.getRecordBufferingCount());
+ }
+
+ @Test
+ void testSlidingWindowCapAtHundred() {
+ // SlidingWindowReservoir(100) retains only the 100 most recent samples.
+ for (int i = 0; i < 110; i++) {
+ metrics.startRecordBuffering();
+ metrics.endRecordBuffering();
+ }
+ assertEquals(110, metrics.getRecordBufferingCount());
+ }
+
+ private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+ private final Map<String, Histogram> histograms = new HashMap<>();
+
+ @Override
+ public <H extends Histogram> H histogram(String name, H histogram) {
+ histograms.put(name, histogram);
+ return histogram;
+ }
+
+ Histogram getHistogram(String name) {
+ return histograms.get(name);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
new file mode 100644
index 000000000000..3a11348e2431
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkIndexBackendMetrics}.
+ */
+class TestFlinkIndexBackendMetrics {
+
+ private CapturingMetricGroup metricGroup;
+ private FlinkIndexBackendMetrics metrics;
+
+ @BeforeEach
+ void setUp() {
+ metricGroup = new CapturingMetricGroup();
+ metrics = new FlinkIndexBackendMetrics(metricGroup);
+ metrics.registerMetrics();
+ }
+
+ @Test
+ void testRegisterMetricsRegistersHistograms() {
+ assertNotNull(metricGroup.getHistogram("localIndexLookupLatency"));
+ assertNotNull(metricGroup.getHistogram("remoteIndexLookupLatency"));
+ assertNotNull(metricGroup.getHistogram("localLookupKeysNum"));
+ assertNotNull(metricGroup.getHistogram("remoteLookupKeysNum"));
+ }
+
+ @Test
+ void testLocalIndexLookupUpdatesHistogramCount() {
+ Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+ assertEquals(0, hist.getCount());
+
+ metrics.startLocalIndexLookup();
+ metrics.endLocalIndexLookup();
+ assertEquals(1, hist.getCount());
+
+ metrics.startLocalIndexLookup();
+ metrics.endLocalIndexLookup();
+ assertEquals(2, hist.getCount());
+ }
+
+ @Test
+ void testRemoteIndexLookupUpdatesHistogramCount() {
+ Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+ assertEquals(0, hist.getCount());
+
+ metrics.startRemoteIndexLookup();
+ metrics.endRemoteIndexLookup();
+ assertEquals(1, hist.getCount());
+ }
+
+ @Test
+ void testLocalLookupKeysNumUpdatesHistogram() {
+ Histogram hist = metricGroup.getHistogram("localLookupKeysNum");
+ assertEquals(0, hist.getCount());
+
+ metrics.updateLocalLookupKeysCount(5);
+ assertEquals(1, hist.getCount());
+ assertEquals(5, hist.getStatistics().getMax());
+
+ metrics.updateLocalLookupKeysCount(3);
+ assertEquals(2, hist.getCount());
+ }
+
+ @Test
+ void testRemoteLookupKeysNumUpdatesHistogram() {
+ Histogram hist = metricGroup.getHistogram("remoteLookupKeysNum");
+ assertEquals(0, hist.getCount());
+
+ metrics.updateRemoteLookupKeysCount(10);
+ assertEquals(1, hist.getCount());
+ assertEquals(10, hist.getStatistics().getMax());
+ }
+
+ @Test
+ void testLocalIndexLookupLatencyIsNonNegative() {
+ metrics.startLocalIndexLookup();
+ metrics.endLocalIndexLookup();
+
+ Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+ assertTrue(hist.getStatistics().getMin() >= 0);
+ }
+
+ @Test
+ void testRemoteIndexLookupLatencyIsNonNegative() {
+ metrics.startRemoteIndexLookup();
+ metrics.endRemoteIndexLookup();
+
+ Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+ assertTrue(hist.getStatistics().getMin() >= 0);
+ }
+
+ @Test
+ void testEndLocalIndexLookupWithoutStartRecordsZero() {
+ metrics.endLocalIndexLookup();
+
+ Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
+ assertEquals(1, hist.getCount());
+ assertEquals(0, hist.getStatistics().getMax());
+ }
+
+ @Test
+ void testEndRemoteIndexLookupWithoutStartRecordsZero() {
+ metrics.endRemoteIndexLookup();
+
+ Histogram hist = metricGroup.getHistogram("remoteIndexLookupLatency");
+ assertEquals(1, hist.getCount());
+ assertEquals(0, hist.getStatistics().getMax());
+ }
+
+ @Test
+ void testVisibleForTestingGetters() {
+ assertEquals(0, metrics.getLocalIndexLookupCount());
+ assertEquals(0, metrics.getRemoteIndexLookupCount());
+ assertEquals(0, metrics.getLocalLookupKeysSampleCount());
+ assertEquals(0, metrics.getRemoteLookupKeysSampleCount());
+
+ metrics.startLocalIndexLookup();
+ metrics.endLocalIndexLookup();
+ assertEquals(1, metrics.getLocalIndexLookupCount());
+
+ metrics.startRemoteIndexLookup();
+ metrics.endRemoteIndexLookup();
+ assertEquals(1, metrics.getRemoteIndexLookupCount());
+
+ metrics.updateLocalLookupKeysCount(4);
+ assertEquals(1, metrics.getLocalLookupKeysSampleCount());
+
+ metrics.updateRemoteLookupKeysCount(2);
+ assertEquals(1, metrics.getRemoteLookupKeysSampleCount());
+ }
+
+ @Test
+ void testCombinedLocalAndRemoteLookupInOneRound() {
+ metrics.startLocalIndexLookup();
+ metrics.endLocalIndexLookup();
+ metrics.updateLocalLookupKeysCount(3);
+ metrics.startRemoteIndexLookup();
+ metrics.endRemoteIndexLookup();
+ metrics.updateRemoteLookupKeysCount(2);
+
+ assertEquals(1, metrics.getLocalIndexLookupCount());
+ assertEquals(1, metrics.getRemoteIndexLookupCount());
+ assertEquals(1, metrics.getLocalLookupKeysSampleCount());
+ assertEquals(1, metrics.getRemoteLookupKeysSampleCount());
+ }
+
+ @Test
+ void testUpdateLookupKeyCountsWithZero() {
+ metrics.updateLocalLookupKeysCount(0);
+ metrics.updateRemoteLookupKeysCount(0);
+
+ Histogram localHist = metricGroup.getHistogram("localLookupKeysNum");
+ Histogram remoteHist = metricGroup.getHistogram("remoteLookupKeysNum");
+ assertEquals(1, localHist.getCount());
+ assertEquals(0, localHist.getStatistics().getMax());
+ assertEquals(1, remoteHist.getCount());
+ assertEquals(0, remoteHist.getStatistics().getMax());
+ }
+
+ private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
+ private final Map<String, Histogram> histograms = new HashMap<>();
+
+ @Override
+ public <H extends Histogram> H histogram(String name, H histogram) {
+ histograms.put(name, histogram);
+ return histogram;
+ }
+
+ Histogram getHistogram(String name) {
+ return histograms.get(name);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index a98a95529fc1..0676aa89222d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -20,8 +20,10 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -41,6 +43,7 @@ import java.util.List;
import static org.apache.hudi.utils.TestData.insertRow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -48,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class TestMinibatchBucketAssignFunction {
private OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> testHarness;
+ private MinibatchBucketAssignFunction function;
private static Configuration conf;
@TempDir
@@ -65,7 +69,7 @@ public class TestMinibatchBucketAssignFunction {
@BeforeEach
public void beforeEach() throws Exception {
// Create the MinibatchBucketAssignFunction
- MinibatchBucketAssignFunction function = new
MinibatchBucketAssignFunction(conf);
+ function = new MinibatchBucketAssignFunction(conf);
// Set up test harness
testHarness = new OneInputStreamOperatorTestHarness<>(new
MiniBatchBucketAssignOperator(function, new OperatorID()), 1, 1, 0);
testHarness.open();
@@ -240,8 +244,128 @@ public class TestMinibatchBucketAssignFunction {
HoodieFlinkInternalRow record = new HoodieFlinkInternalRow("id1", "par1",
"I",
insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1),
StringData.fromString("par1")));
testHarness.processElement(new StreamRecord<>(record));
-
+
// Close should not throw any exceptions
testHarness.close();
}
+
+ @Test
+ public void testInsertOperationFlushedByEndInput() throws Exception {
+ // With OPERATION=INSERT, endInput() should flush the partial buffer.
+ Configuration insertConf = Configuration.fromMap(conf.toMap());
+ insertConf.set(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+
+ MinibatchBucketAssignFunction insertFunction = new
MinibatchBucketAssignFunction(insertConf);
+ OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> insertHarness =
+ new OneInputStreamOperatorTestHarness<>(new
MiniBatchBucketAssignOperator(insertFunction, new OperatorID()), 1, 1, 0);
+ insertHarness.open();
+ try {
+ for (int i = 0; i < 3; i++) {
+ String key = "ins_key_" + i;
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(key,
"par9", "I",
+ insertRow(StringData.fromString(key),
StringData.fromString("Name"), 30,
+ TimestampData.fromEpochMillis(1),
StringData.fromString("par9")));
+ insertHarness.processElement(new StreamRecord<>(record));
+ }
+
+ // Buffer holds 3 records, no flush yet.
+ assertEquals(0, insertHarness.extractOutputValues().size(), "Records
should still be buffered");
+
+ insertHarness.endInput();
+
+ List<HoodieFlinkInternalRow> output =
insertHarness.extractOutputValues();
+ assertEquals(3, output.size(), "endInput should flush all buffered
records");
+ for (HoodieFlinkInternalRow row : output) {
+ assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(),
"File ID should be assigned");
+ assertEquals("I", row.getInstantTime(), "All records should be INSERT
bucket assignments");
+ }
+ } finally {
+ insertHarness.close();
+ }
+ }
+
+ @Test
+ public void testEmptyBufferPrepareSnapshotPreBarrierEmitsNothing() throws
Exception {
+ // prepareSnapshotPreBarrier on an empty buffer should not throw and
should emit no records.
+ testHarness.prepareSnapshotPreBarrier(1L);
+
+ List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
+ assertEquals(0, output.size(), "Empty buffer should produce no output
during snapshot");
+ }
+
+ @Test
+ public void testMultipleCheckpointFlushes() throws Exception {
+ // Each prepareSnapshotPreBarrier call should flush only the records
buffered since the last flush.
+ HoodieFlinkInternalRow record1 = new HoodieFlinkInternalRow("id1", "par1",
"I",
+ insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1",
"I",
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+
+ // First checkpoint cycle.
+ testHarness.processElement(new StreamRecord<>(record1));
+ testHarness.prepareSnapshotPreBarrier(1L);
+ assertEquals(1, testHarness.extractOutputValues().size(), "First
checkpoint should flush 1 record");
+
+ // Second checkpoint cycle.
+ testHarness.processElement(new StreamRecord<>(record2));
+ testHarness.prepareSnapshotPreBarrier(2L);
+
+ List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
+ assertEquals(2, output.size(), "Second checkpoint should have flushed 1
more record (2 cumulative)");
+ for (HoodieFlinkInternalRow row : output) {
+ assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(), "File
ID should be assigned");
+ assertEquals("U", row.getInstantTime(), "Previously written records
should be updates");
+ }
+ }
+
+ @Test
+ public void testCustomMinibatchSizeAboveDefault() {
+ // When the configured size exceeds the default minimum, it should be used
as-is.
+ Configuration customConf = Configuration.fromMap(conf.toMap());
+ customConf.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 2000);
+ MinibatchBucketAssignFunction customFunction = new
MinibatchBucketAssignFunction(customConf);
+ assertEquals(2000, customFunction.getMiniBatchSize(),
+ "Should use the configured size when it is above the default minimum");
+ }
+
+ @Test
+ public void testEndInputOnEmptyBufferDoesNotThrow() throws Exception {
+ // endInput with an empty buffer must not throw and should emit no records.
+ testHarness.endInput();
+ assertEquals(0, testHarness.extractOutputValues().size(), "No output
expected when buffer was already empty");
+ }
+
+ @Test
+ public void testDelegateMetricsNonNullAfterOpen() {
+ FlinkBucketAssignMetrics delegateMetrics = function.getDelegateMetrics();
+ assertNotNull(delegateMetrics, "Delegate metrics should be initialized
after open");
+ }
+
+ @Test
+ public void testBufferingMetricIncrementedAfterFullBatchFlush() throws
Exception {
+ // Processing exactly miniBatchSize records triggers one automatic flush,
recording one buffering sample.
+ for (int i = 0; i <
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue(); i++) {
+ String key = "batch_key_" + i;
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(key, "par5",
"I",
+ insertRow(StringData.fromString(key), StringData.fromString("Name"),
25,
+ TimestampData.fromEpochMillis(1),
StringData.fromString("par5")));
+ testHarness.processElement(new StreamRecord<>(record));
+ }
+ assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
+ "One buffering cycle should be recorded after a full batch flush");
+ }
+
+ @Test
+ public void testBufferingMetricIncrementedAfterCheckpointFlush() throws
Exception {
+ // A partial buffer flushed by a checkpoint barrier records one buffering
sample.
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow("id1", "par1",
"I",
+ insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ testHarness.processElement(new StreamRecord<>(record));
+ testHarness.prepareSnapshotPreBarrier(1L);
+ assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
+ "One buffering cycle should be recorded after a checkpoint flush");
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 3b3f72a9fcb5..3f1a3b28af1b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -18,6 +18,9 @@
package org.apache.hudi.sink.partitioner.index;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.configuration.FlinkOptions;
@@ -56,6 +59,7 @@ import static org.mockito.Mockito.when;
public class TestGlobalRecordLevelIndexBackend {
private Configuration conf;
+ private NoOpMetricRegistry registry = new NoOpMetricRegistry();
@TempDir
File tempFile;
@@ -75,6 +79,7 @@ public class TestGlobalRecordLevelIndexBackend {
String firstCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+
globalRecordLevelIndexBackend.registerMetrics(TaskManagerMetricGroup.createTaskManagerMetricGroup(registry,
"localhost", ResourceID.generate()));
// get record location
HoodieRecordGlobalLocation location =
globalRecordLevelIndexBackend.get(Collections.singletonList("id1")).get("id1");
assertNotNull(location);
@@ -190,4 +195,31 @@ public class TestGlobalRecordLevelIndexBackend {
assertEquals("par1",
globalRecordLevelIndexBackend.get(Collections.singletonList("id4_0")).get("id4_0").getPartitionPath());
}
}
+
+ @Test
+ void testLookupWithoutMetricsRegistrationIsNullSafe() throws Exception {
+ // Verifies that the if (metrics != null) guards in get(List) don't throw
even when
+ // registerMetrics was never called.
+ try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+ HoodieRecordGlobalLocation location = new
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
+ backend.update("null_metrics_key", location);
+ Map<String, HoodieRecordGlobalLocation> result =
backend.get(Collections.singletonList("null_metrics_key"));
+ assertEquals(location, result.get("null_metrics_key"));
+ }
+ }
+
+ @Test
+ void testRegisterMetricsIsIdempotent() throws Exception {
+ // The second registerMetrics call must be a no-op and must not throw.
+ try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+ TaskManagerMetricGroup group =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
+ registry, "localhost", ResourceID.generate());
+ backend.registerMetrics(group);
+ backend.registerMetrics(group);
+
+ HoodieRecordGlobalLocation location = new
HoodieRecordGlobalLocation("par2", "000000002", "file-id-2");
+ backend.update("idempotent_key", location);
+ assertEquals(location,
backend.get(Collections.singletonList("idempotent_key")).get("idempotent_key"));
+ }
+ }
}