This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ac6c6f21f0e feat: Introduce inflight record index cache for bucket 
assigning (#17802)
6ac6c6f21f0e is described below

commit 6ac6c6f21f0e12b7e76154e71c9ae27f4169fa8b
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Jan 9 17:30:40 2026 +0800

    feat: Introduce inflight record index cache for bucket assigning (#17802)
---
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +
 .../sink/partitioner/index/RecordIndexCache.java   | 135 ++++++++++
 .../partitioner/index/TestRecordIndexCache.java    | 283 +++++++++++++++++++++
 3 files changed, 426 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7b22f1d7ef7d..5d6d7052a655 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -274,6 +274,14 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(".*")
       .withDescription("Whether to load partitions in state if partition path 
matching, default `*`");
 
+  @AdvancedConfig
+  public static final ConfigOption<Long> INDEX_RLI_CACHE_SIZE = ConfigOptions
+      .key("index.rli.cache.size")
+      .longType()
+      .defaultValue(100L) // default 100 MB
+      .withDescription("Maximum memory in MB for the inflight record index 
cache during one checkpoint interval.\n"
+          + "When record level index is used to assign bucket, record 
locations will first be cached before the record index is committed.");
+
   // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
new file mode 100644
index 000000000000..6ee3084f5460
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.serialization.DefaultSerializer;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.FlinkWriteClients;
+
+import lombok.Getter;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Cache to hold the in-flight record level index entries which are not 
committed to metadata table yet.
+ * <p>
+ * todo: use map backed by flink managed memory.
+ */
+public class RecordIndexCache implements Closeable {
+  @VisibleForTesting
+  @Getter
+  private final TreeMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> caches;
+  private final HoodieWriteConfig writeConfig;
+  private final long maxCacheSizeInBytes;
+
+  public RecordIndexCache(Configuration conf, long initCheckpointId) {
+    this.caches = new TreeMap<>(Comparator.reverseOrder());
+    this.writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false, 
false);
+    this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) * 
1024 * 1024;
+    addCheckpointCache(initCheckpointId);
+  }
+
+  /**
+   * Add a new checkpoint cache for the given checkpoint ID.
+   *
+   * @param checkpointId the checkpoint ID
+   */
+  public void addCheckpointCache(long checkpointId) {
+    try {
+      // Create a new ExternalSpillableMap for this checkpoint
+      ExternalSpillableMap<String, HoodieRecordGlobalLocation> newCache =
+          new ExternalSpillableMap<>(
+              maxCacheSizeInBytes,
+              writeConfig.getSpillableMapBasePath(),
+              new DefaultSizeEstimator<>(),
+              new DefaultSizeEstimator<>(),
+              writeConfig.getCommonConfig().getSpillableDiskMapType(),
+              new DefaultSerializer<>(),
+              
writeConfig.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED),
+              "RecordIndexCache-" + checkpointId);
+      caches.put(checkpointId, newCache);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create checkpoint cache for 
checkpoint ID: " + checkpointId, e);
+    }
+  }
+
+  /**
+   * Search the record location from caches with larger checkpoint id to that 
with smaller checkpoint id,
+   * return early if the record location is found for the record key, return 
null otherwise.
+   *
+   * @param recordKey the record key for querying the location.
+   * @return the record location.
+   */
+  public HoodieRecordGlobalLocation get(String recordKey) {
+    // Iterate through the caches in descending order of checkpoint ID (larger 
to smaller)
+    for (ExternalSpillableMap<String, HoodieRecordGlobalLocation> cache : 
caches.values()) {
+      HoodieRecordGlobalLocation location = cache.get(recordKey);
+      if (location != null) {
+        return location;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Put the updated record location to the sub cache with the largest 
checkpoint id.
+   *
+   * @param recordKey the record key for querying the location.
+   * @param recordGlobalLocation the record location.
+   */
+  public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) {
+    ValidationUtils.checkArgument(!caches.isEmpty(), "record index cache 
should not be empty.");
+    // Get the sub cache with the largest checkpoint ID (first entry in the 
reverse-ordered TreeMap)
+    caches.firstEntry().getValue().put(recordKey, recordGlobalLocation);
+  }
+
+  /**
+   * Clean all the cache entries for checkpoint whose id is less than the 
given checkpoint id.
+   *
+   * @param checkpointId the id of checkpoint
+   */
+  public void clean(long checkpointId) {
+    // Get all entries that are less than or equal to the given checkpointId
+    NavigableMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> subMap = caches.tailMap(checkpointId, false);
+    // Close all the ExternalSpillableMap instances before removing them
+    subMap.values().forEach(ExternalSpillableMap::close);
+    // Remove all the entries from the main cache
+    subMap.clear();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Close all the ExternalSpillableMap instances before removing them
+    caches.values().forEach(ExternalSpillableMap::close);
+    // Close all ExternalSpillableMap instances before clearing the cache
+    caches.clear();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
new file mode 100644
index 000000000000..0836f97d2926
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link RecordIndexCache}.
+ */
+public class TestRecordIndexCache {
+  @TempDir
+  File tempDir;
+  private RecordIndexCache cache;
+
+  @BeforeEach
+  void setUp() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 100L); // 100MB cache size
+    this.cache = new RecordIndexCache(conf, 1L);
+  }
+
+  @AfterEach
+  void clean() throws IOException {
+    this.cache.close();
+  }
+
+  @Test
+  void testConstructor() {
+    // Test constructor with initial checkpoint ID
+    assertNotNull(cache.getCaches());
+    assertEquals(1, cache.getCaches().size());
+    assertTrue(cache.getCaches().containsKey(1L));
+  }
+
+  @Test
+  void testAddCheckpointCache() {
+    // Add another checkpoint cache
+    cache.addCheckpointCache(2L);
+    
+    assertEquals(2, cache.getCaches().size());
+    assertTrue(cache.getCaches().containsKey(1L));
+    assertTrue(cache.getCaches().containsKey(2L));
+    
+    // Check that checkpoints are stored in reverse order (2L should be first 
in reverse-ordered TreeMap)
+    assertEquals(Long.valueOf(2L), cache.getCaches().firstKey());
+    assertEquals(Long.valueOf(1L), cache.getCaches().lastKey());
+  }
+
+  @Test
+  void testUpdateAndGet() {
+    String recordKey = "key1";
+    HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
+    
+    // Initially should return null
+    assertNull(cache.get(recordKey));
+    
+    // Update the cache with a record location
+    cache.update(recordKey, location);
+    
+    // Should now return the location
+    HoodieRecordGlobalLocation retrievedLocation = cache.get(recordKey);
+    assertEquals(location, retrievedLocation);
+  }
+
+  @Test
+  void testUpdateWithMultipleCheckpoints() {
+    cache.addCheckpointCache(2L);
+
+    String recordKey = "key1";
+    HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
+    HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("partition2", "1002", "file_id2");
+    
+    // Update in checkpoint 2
+    cache.update(recordKey, location1);
+    
+    // Check that it's in the highest checkpoint cache (2L)
+    HoodieRecordGlobalLocation retrieved = cache.get(recordKey);
+    assertEquals(location1, retrieved);
+    
+    // Add to checkpoint 3 and update again
+    cache.addCheckpointCache(3L);
+    cache.update(recordKey, location2);
+    
+    // Should now return the updated location
+    retrieved = cache.get(recordKey);
+    assertEquals(location2, retrieved);
+  }
+
+  @Test
+  void testGetFromMultipleCheckpoints() {
+    cache.addCheckpointCache(2L);
+    
+    String recordKey1 = "key1";
+    String recordKey2 = "key2";
+    HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
+    HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("partition2", "1002", "file_id2");
+    
+    // Add to checkpoint 1
+    cache.getCaches().get(1L).put(recordKey1, location1);
+
+    cache.getCaches().get(1L).put(recordKey2, location1);
+
+    // Add to checkpoint 2 (higher checkpoint should take precedence)
+    cache.getCaches().get(2L).put(recordKey1, location2);
+    
+    // Should return from higher checkpoint (2L) first
+    HoodieRecordGlobalLocation retrieved = cache.get(recordKey1);
+    assertEquals(location2, retrieved);
+
+    // Should return from previous checkpoint (1L)
+    retrieved = cache.get(recordKey2);
+    assertEquals(location1, retrieved);
+  }
+
+  @Test
+  void testClean() {
+    cache.addCheckpointCache(2L);
+    cache.addCheckpointCache(3L);
+    cache.addCheckpointCache(4L);
+    
+    String recordKey1 = "key1";
+    String recordKey2 = "key2";
+    HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
+    HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("partition2", "1002", "file_id2");
+    
+    // Add records to different checkpoints
+    cache.getCaches().get(1L).put(recordKey1, location1);
+    cache.getCaches().get(2L).put(recordKey2, location2);
+    
+    // Verify records exist before cleaning
+    assertNotNull(cache.get(recordKey1));
+    assertNotNull(cache.get(recordKey2));
+    
+    // Clean checkpoints up to and including 2
+    cache.clean(3L);
+    
+    // Check that checkpoints 1 and 2 are removed
+    assertEquals(2, cache.getCaches().size()); // Should have checkpoints 3 
and 4
+    assertFalse(cache.getCaches().containsKey(1L));
+    assertFalse(cache.getCaches().containsKey(2L));
+    assertTrue(cache.getCaches().containsKey(3L));
+    assertTrue(cache.getCaches().containsKey(4L));
+    
+    // Records from cleaned checkpoints should no longer be accessible
+    assertNull(cache.get(recordKey1));
+    assertNull(cache.get(recordKey2));
+  }
+
+  @Test
+  void testClose() throws IOException {
+    cache.addCheckpointCache(2L);
+    
+    String recordKey = "key1";
+    HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
+    cache.update(recordKey, location);
+    
+    // Verify cache has entries before closing
+    assertEquals(2, cache.getCaches().size());
+    assertNotNull(cache.get(recordKey));
+    
+    // Close the cache
+    cache.close();
+    
+    // After closing, the cache should be empty
+    assertEquals(0, cache.getCaches().size());
+  }
+
+  @Test
+  void testUpdateWithEmptyCache() {
+    // Clear the cache to test error condition
+    cache.getCaches().clear();
+
+    String recordKey = "key1";
+    HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id");
+
+    // Should throw an exception when trying to update an empty cache
+    assertThrows(IllegalArgumentException.class, () -> {
+      cache.update(recordKey, location);
+    });
+  }
+
+  @Test
+  void testSpillToDisk() throws IOException {
+    // Create a new cache with a very small size to force spilling
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L); // 1MB cache size to 
force spilling
+
+    try (RecordIndexCache smallCache = new RecordIndexCache(conf, 1L)) {
+      String recordKeyPrefix = "key";
+      List<HoodieRecordGlobalLocation> locations = new ArrayList<>();
+      for (int i = 0; i < 5000; i++) {
+        HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1", i);
+        locations.add(location);
+        // Update the cache with a record location
+        smallCache.update(recordKeyPrefix + i, location);
+      }
+
+      // Verify that the data has been spilled to disk by checking the 
underlying ExternalSpillableMap
+      ExternalSpillableMap<String, HoodieRecordGlobalLocation> spillableMap = 
smallCache.getCaches().get(1L);
+      assertTrue(spillableMap.getDiskBasedMapNumEntries() > 0, "Data should be 
spilled to disk");
+
+      for (int i = 0; i < 5000; i++) {
+        HoodieRecordGlobalLocation retrievedLocation = 
smallCache.get(recordKeyPrefix + i);
+        assertEquals(locations.get(i), retrievedLocation);
+      }
+    }
+  }
+
+  @Test
+  void testSpillWithMultipleCheckpoints() throws IOException {
+    // Create a new cache with a very small size to force spilling
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L); // 1MB cache size to 
force spilling
+
+    try (RecordIndexCache smallCache = new RecordIndexCache(conf, 1L)) {
+      // Add records to multiple checkpoints
+      String recordKeyPrefix = "key";
+      for (int i = 0; i < 5000; i++) {
+        HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1", i);
+        // Update the cache with a record location
+        smallCache.update(recordKeyPrefix + i, location);
+      }
+      // Verify that the data has been spilled to disk by checking the 
underlying ExternalSpillableMap
+      ExternalSpillableMap<String, HoodieRecordGlobalLocation> spillableMap = 
smallCache.getCaches().get(1L);
+      assertTrue(spillableMap.getDiskBasedMapNumEntries() > 0, "Data should be 
spilled to disk");
+
+      // Add another checkpoint
+      smallCache.addCheckpointCache(2L);
+      List<HoodieRecordGlobalLocation> locations = new ArrayList<>();
+      for (int i = 0; i < 5000; i++) {
+        HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id2", i);
+        locations.add(location);
+        // Update the cache with a record location
+        smallCache.update(recordKeyPrefix + i, location);
+      }
+      // Verify that the data has been spilled to disk by checking the 
underlying ExternalSpillableMap
+      spillableMap = smallCache.getCaches().get(1L);
+      assertTrue(spillableMap.getDiskBasedMapNumEntries() > 0, "Data should be 
spilled to disk");
+
+      for (int i = 0; i < 5000; i++) {
+        HoodieRecordGlobalLocation retrievedLocation = 
smallCache.get(recordKeyPrefix + i);
+        assertEquals(locations.get(i), retrievedLocation);
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to