This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c819a5dad44 perf: Support lazy clean of the RLI cache during bucket 
assigning (#18018)
4c819a5dad44 is described below

commit 4c819a5dad445a5ab60b554aeda8838d89fbcb91
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Feb 5 17:58:56 2026 +0800

    perf: Support lazy clean of the RLI cache during bucket assigning (#18018)
    
    * perf: Support lazy clean of the RLI cache during bucket assigning
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    |   7 +-
 .../sink/partitioner/BucketAssignFunction.java     |   2 +-
 .../hudi/sink/partitioner/index/IndexBackend.java  |   5 +-
 .../sink/partitioner/index/RecordIndexCache.java   | 111 ++++++++++++++++----
 .../partitioner/index/RecordLevelIndexBackend.java |  11 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |   4 +-
 .../partitioner/index/TestRecordIndexCache.java    | 116 ++++++++++++++++-----
 .../index/TestRecordLevelIndexBackend.java         |  80 +++++++++++++-
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |   2 +-
 9 files changed, 274 insertions(+), 64 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 2e7c017225ee..5dee4b8b1213 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -292,9 +292,10 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Long> INDEX_RLI_CACHE_SIZE = ConfigOptions
       .key("index.rli.cache.size")
       .longType()
-      .defaultValue(100L) // default 100 MB
-      .withDescription("Maximum memory in MB for the inflight record index 
cache during one checkpoint interval.\n"
-          + "When record level index is used to assign bucket, record 
locations will first be cached before the record index is committed.");
+      .defaultValue(256L) // default 256 MB
+      .withDescription("Maximum memory allocated for the record level index 
cache per bucket-assign task.\n"
+          + "The memory size of each individual cache within a checkpoint 
interval is dynamically calculated based on the \n"
+          + "average memory size of caches for historical checkpoints.");
 
   @AdvancedConfig
   public static final ConfigOption<Integer> INDEX_RLI_LOOKUP_MINIBATCH_SIZE = 
ConfigOptions
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index f8ab39a92162..ce72c1268b6e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -231,7 +231,7 @@ public class BucketAssignFunction
   public void notifyCheckpointComplete(long checkpointId) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reload(checkpointId);
-    this.indexBackend.onCheckpointComplete(this.correspondent);
+    this.indexBackend.onCheckpointComplete(this.correspondent, checkpointId);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index 74d98c46ab14..de39e44b6af7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -58,9 +58,10 @@ public interface IndexBackend extends Closeable {
   /**
    * Listener method called when the bucket assign operator receives a notify 
checkpoint complete event.
    *
-   * @param correspondent The Correspondent used to get inflight instants from 
the coordinator.
+   * @param correspondent         The Correspondent used to get inflight 
instants from the coordinator.
+   * @param completedCheckpointId The latest completed checkpoint id
    */
-  default void onCheckpointComplete(Correspondent correspondent) {
+  default void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointId) {
     // do nothing.
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
index 267025ba4519..8f78bd70d34f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
@@ -30,11 +30,11 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.util.FlinkWriteClients;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -44,17 +44,35 @@ import java.util.TreeMap;
  * <p>
  * todo: use map backed by flink managed memory.
  */
+@Slf4j
 public class RecordIndexCache implements Closeable {
   @VisibleForTesting
   @Getter
   private final TreeMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> caches;
   private final HoodieWriteConfig writeConfig;
+  @VisibleForTesting
+  @Getter
   private final long maxCacheSizeInBytes;
+  // the minimum checkpoint id retained in the cache.
+  private long minRetainedCheckpointId;
+  private long recordCnt = 0;
+
+  /**
+   * Step size to check the total memory size of the cache.
+   */
+  private static final int NUMBER_OF_RECORDS_TO_CHECK_MEMORY_SIZE = 1000;
+
+  /**
+   * Factor for estimating the real size of memory used by a spilled map.
+   */
+  @VisibleForTesting
+  public static final double FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP = 0.8;
 
   public RecordIndexCache(Configuration conf, long initCheckpointId) {
     this.caches = new TreeMap<>(Comparator.reverseOrder());
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false, 
false);
     this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) * 
1024 * 1024;
+    this.minRetainedCheckpointId = Integer.MIN_VALUE;
     addCheckpointCache(initCheckpointId);
   }
 
@@ -65,14 +83,21 @@ public class RecordIndexCache implements Closeable {
    */
   public void addCheckpointCache(long checkpointId) {
     try {
-      // Create a new ExternalSpillableMap for this checkpoint
+      long inferredCacheSize = inferMemorySizeForCache();
+      // clean the caches to ensure enough memory for the new cache
+      cleanIfNecessary(inferredCacheSize);
+      // create a new map cache for this checkpoint
       ExternalSpillableMap<String, HoodieRecordGlobalLocation> newCache =
           new ExternalSpillableMap<>(
-              maxCacheSizeInBytes,
+              inferredCacheSize,
               writeConfig.getSpillableMapBasePath(),
               new DefaultSizeEstimator<>(),
               new DefaultSizeEstimator<>(),
-              writeConfig.getCommonConfig().getSpillableDiskMapType(),
+              // using ROCKS_DB disk map always. As BITCASK disk map get extra 
memory
+              // cost for each key during spilling: key -> 
ValueMetadata(filePath, valueSize, position, ts).
+              // so it's redundant to use BITCASK disk map since the map is 
used to store
+              // HoodieRecordGlobalLocation which has similar size as 
ValueMetadata.
+              ExternalSpillableMap.DiskMapType.ROCKS_DB,
               new DefaultSerializer<>(),
               
writeConfig.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED),
               "RecordIndexCache-" + checkpointId);
@@ -82,6 +107,33 @@ public class RecordIndexCache implements Closeable {
     }
   }
 
+  /**
+   * Infer the size of memory for the cache:
+   * - The memory size for the first spillable map is MAX_MEM / 2
+   * - The memory size for the following checkpoint cache is the average used 
memory of all existed caches.
+   * - Clean caches for committed instants if the remaining memory is not 
enough for the new cache.
+   */
+  @VisibleForTesting
+  long inferMemorySizeForCache() {
+    if (caches.isEmpty()) {
+      return maxCacheSizeInBytes / 2;
+    }
+
+    long totalUsedMemory = caches.values().stream()
+        .map(
+            // if the cache is spilled, adjust the actual used memory by 
multiplying a factor
+            m -> m.getSizeOfFileOnDiskInBytes() > 0
+                ? (long) (m.getCurrentInMemoryMapSize() / 
FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP)
+                : m.getCurrentInMemoryMapSize())
+        .reduce(Long::sum).orElse(0L);
+    long avgUsedMemorySize = totalUsedMemory / caches.size();
+
+    if (avgUsedMemorySize <= 0) {
+      avgUsedMemorySize = maxCacheSizeInBytes / 2;
+    }
+    return avgUsedMemorySize;
+  }
+
   /**
    * Search the record location from caches with larger checkpoint id to that 
with smaller checkpoint id,
    * return early if the record location is found for the record key, return 
null otherwise.
@@ -107,37 +159,50 @@ public class RecordIndexCache implements Closeable {
    * @param recordGlobalLocation the record location.
    */
   public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) {
-    ValidationUtils.checkArgument(!caches.isEmpty(), "record index cache 
should not be empty.");
-    // Get the sub cache with the largest checkpoint ID (first entry in the 
reverse-ordered TreeMap)
+    ValidationUtils.checkArgument(!caches.isEmpty(), "Record index cache 
should not be empty.");
+    // get the cache with the largest checkpoint ID (first entry in the 
reverse-ordered TreeMap).
     caches.firstEntry().getValue().put(recordKey, recordGlobalLocation);
+
+    if ((++recordCnt) % NUMBER_OF_RECORDS_TO_CHECK_MEMORY_SIZE == 0) {
+      cleanIfNecessary(0L);
+      recordCnt = 0;
+    }
   }
 
   /**
-   * Clean all the cache entries for checkpoint whose id is less than the 
given checkpoint id.
+   * Marks the historical cache entries as evictable.
    *
-   * @param checkpointId the id of checkpoint
+   * @param checkpointId The minimum retained checkpoint id
    */
-  public void clean(long checkpointId) {
-    NavigableMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> subMap;
-    if (checkpointId == Long.MAX_VALUE) {
-      // clean all the cache entries for old checkpoint ids, and only keeps 
the cache for the maximum checkpoint id,
-      // which aims to clear memory while also ensuring a certain cache hit 
rate
-      subMap = caches.firstEntry() == null ? Collections.emptyNavigableMap() : 
caches.tailMap(caches.firstKey(), false);
-    } else {
-      subMap = caches.tailMap(checkpointId, false);
+  public void markAsEvictable(long checkpointId) {
+    ValidationUtils.checkArgument(checkpointId >= minRetainedCheckpointId,
+        String.format("The checkpoint id for minium inflight instant should be 
increased,"
+            + " ckpIdForMinInflightInstant: %s, received checkpointId: %s", 
minRetainedCheckpointId, checkpointId));
+    minRetainedCheckpointId = checkpointId;
+  }
+
+  /**
+   * Performs the actual cleaning to release memory for new caches.
+   *
+   * @param nextCacheSize the size for the next new cache
+   */
+  private void cleanIfNecessary(long nextCacheSize) {
+    while (!caches.isEmpty() && caches.lastKey() < minRetainedCheckpointId
+        && getInMemoryMapSize() + nextCacheSize > this.maxCacheSizeInBytes) {
+      NavigableMap.Entry<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> lastEntry = caches.pollLastEntry();
+      lastEntry.getValue().close();
+      log.info("Clean record index cache for checkpoint: {}", 
lastEntry.getKey());
     }
-    // Get all entries that are less than or equal to the given checkpointId
-    // Close all the ExternalSpillableMap instances before removing them
-    subMap.values().forEach(ExternalSpillableMap::close);
-    // Remove all the entries from the main cache
-    subMap.clear();
+  }
+
+  private long getInMemoryMapSize() {
+    return 
caches.values().stream().map(ExternalSpillableMap::getCurrentInMemoryMapSize).reduce(Long::sum).orElse(0L);
   }
 
   @Override
   public void close() throws IOException {
-    // Close all the ExternalSpillableMap instances before removing them
+    // Close all the map instances before removing them
     caches.values().forEach(ExternalSpillableMap::close);
-    // Close all ExternalSpillableMap instances before clearing the cache
     caches.clear();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index 68200bf3ea13..6fdecb0f22c4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -33,6 +33,7 @@ import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
@@ -45,6 +46,7 @@ import java.util.Map;
 /**
  * An implementation of {@link IndexBackend} based on the record level index 
in metadata table.
  */
+@Slf4j
 public class RecordLevelIndexBackend implements MinibatchIndexBackend {
   @VisibleForTesting
   @Getter
@@ -106,9 +108,14 @@ public class RecordLevelIndexBackend implements 
MinibatchIndexBackend {
   }
 
   @Override
-  public void onCheckpointComplete(Correspondent correspondent) {
+  public void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointID) {
     Map<Long, String> inflightInstants = 
correspondent.requestInflightInstants();
-    
recordIndexCache.clean(inflightInstants.keySet().stream().min(Long::compareTo).orElse(Long.MAX_VALUE));
+    log.info("Inflight instants and the corresponding checkpoints: {}, 
notified completed checkpoints: {}",
+        inflightInstants, completedCheckpointID);
+    // if there are no inflight instants,
+    // the latest completed checkpoint id is used as the minimum checkpoint id,
+    // since the streaming write operator always uses previous checkpoint id 
to request the new instant.
+    
recordIndexCache.markAsEvictable(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointID));
     this.metaClient.reloadActiveTimeline();
     reloadMetadataTable();
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 60560b7cb3a6..9bf56a9dc4d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -924,8 +924,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .assertInflightCachesOfBucketAssigner(2)
         .assertNextEvent(4, "par1,par2,par3,par4")
         .checkpointComplete(1)
-        // clean the first inflight cache, left the latest inflight cache.
-        .assertInflightCachesOfBucketAssigner(1)
+        // the first inflight cache will not be cleaned, since the current 
total memory size does not exceed the limit.
+        .assertInflightCachesOfBucketAssigner(2)
         .checkWrittenData(EXPECTED1);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
index 0836f97d2926..d490a9c6ab96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
 
 /**
  * Test cases for {@link RecordIndexCache}.
@@ -57,7 +59,7 @@ public class TestRecordIndexCache {
   }
 
   @AfterEach
-  void clean() throws IOException {
+  void markAsEvictable() throws IOException {
     this.cache.close();
   }
 
@@ -150,37 +152,46 @@ public class TestRecordIndexCache {
   }
 
   @Test
-  void testClean() {
-    cache.addCheckpointCache(2L);
-    cache.addCheckpointCache(3L);
-    cache.addCheckpointCache(4L);
-    
-    String recordKey1 = "key1";
-    String recordKey2 = "key2";
+  void testMarkAsEvictable() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L); // 100MB cache size
+    cache = new RecordIndexCache(conf, 1L);
+
     HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
     HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("partition2", "1002", "file_id2");
-    
     // Add records to different checkpoints
-    cache.getCaches().get(1L).put(recordKey1, location1);
-    cache.getCaches().get(2L).put(recordKey2, location2);
-    
+    for (int i = 0; i < 5000; i++) {
+      cache.update("k1_" + i, location1);
+    }
+    cache.addCheckpointCache(2L);
+    for (int i = 0; i < 5000; i++) {
+      cache.update("k2_" + i, location2);
+    }
+
     // Verify records exist before cleaning
-    assertNotNull(cache.get(recordKey1));
-    assertNotNull(cache.get(recordKey2));
-    
-    // Clean checkpoints up to and including 2
-    cache.clean(3L);
-    
-    // Check that checkpoints 1 and 2 are removed
-    assertEquals(2, cache.getCaches().size()); // Should have checkpoints 3 
and 4
+    assertNotNull(cache.get("k1_0"));
+    assertNotNull(cache.get("k2_0"));
+
+    cache.addCheckpointCache(3L);
+    for (int i = 0; i < 800; i++) {
+      cache.update("k3_" + i, location2);
+    }
+
+    cache.addCheckpointCache(4L);
+    // mark 1,2 as cleanable
+    cache.markAsEvictable(3L);
+    // write another batch of records to trigger cleaning
+    for (int i = 0; i < 800; i++) {
+      cache.update("k4_" + i, location2);
+    }
+
+    // Check that checkpoint 1 are removed
+    assertEquals(3, cache.getCaches().size()); // Should have checkpoints 2, 3 
and 4
+
     assertFalse(cache.getCaches().containsKey(1L));
-    assertFalse(cache.getCaches().containsKey(2L));
+    assertTrue(cache.getCaches().containsKey(2L));
     assertTrue(cache.getCaches().containsKey(3L));
     assertTrue(cache.getCaches().containsKey(4L));
-    
-    // Records from cleaned checkpoints should no longer be accessible
-    assertNull(cache.get(recordKey1));
-    assertNull(cache.get(recordKey2));
   }
 
   @Test
@@ -280,4 +291,57 @@ public class TestRecordIndexCache {
       }
     }
   }
-}
\ No newline at end of file
+
+  @Test
+  void testInferMemorySizeForCacheWithEmptyCaches() {
+    cache.getCaches().clear();
+
+    // infer the cache for the first checkpoint
+    long inferredSize = cache.inferMemorySizeForCache();
+    assertEquals(cache.getMaxCacheSizeInBytes() / 2, inferredSize);
+
+    // cache for the first checkpoint is empty
+    ExternalSpillableMap<String, HoodieRecordGlobalLocation> map1 = 
Mockito.mock(ExternalSpillableMap.class);
+    when(map1.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+    when(map1.getCurrentInMemoryMapSize()).thenReturn(0L);
+    cache.getCaches().put(1L, map1);
+
+    inferredSize = cache.inferMemorySizeForCache();
+    assertEquals(cache.getMaxCacheSizeInBytes() / 2, inferredSize);
+  }
+
+  @Test
+  void testInferMemorySizeForCacheUsesAverageInMemorySize() {
+    cache.getCaches().clear();
+
+    ExternalSpillableMap<String, HoodieRecordGlobalLocation> map1 = 
Mockito.mock(ExternalSpillableMap.class);
+    ExternalSpillableMap<String, HoodieRecordGlobalLocation> map2 = 
Mockito.mock(ExternalSpillableMap.class);
+    when(map1.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+    when(map2.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+    when(map1.getCurrentInMemoryMapSize()).thenReturn(100L);
+    when(map2.getCurrentInMemoryMapSize()).thenReturn(300L);
+
+    cache.getCaches().put(1L, map1);
+    cache.getCaches().put(2L, map2);
+
+    assertEquals(200L, cache.inferMemorySizeForCache());
+  }
+
+  @Test
+  void testInferMemorySizeForCacheAdjustsForSpilledMaps() {
+    cache.getCaches().clear();
+
+    ExternalSpillableMap<String, HoodieRecordGlobalLocation> spilledMap = 
Mockito.mock(ExternalSpillableMap.class);
+    ExternalSpillableMap<String, HoodieRecordGlobalLocation> inMemoryMap = 
Mockito.mock(ExternalSpillableMap.class);
+    when(spilledMap.getSizeOfFileOnDiskInBytes()).thenReturn(200L);
+    when(spilledMap.getCurrentInMemoryMapSize()).thenReturn(800L);
+    when(inMemoryMap.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+    when(inMemoryMap.getCurrentInMemoryMapSize()).thenReturn(500L);
+
+    cache.getCaches().put(2L, spilledMap);
+    cache.getCaches().put(1L, inMemoryMap);
+
+    long expected = (long) ((800L / 
RecordIndexCache.FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP) + 500L) / 2;
+    assertEquals(expected, cache.inferMemorySizeForCache());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 49d4370266e8..70011b514ee7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -106,13 +107,84 @@ public class TestRecordLevelIndexBackend {
       Map<Long, String> inflightInstants = new HashMap<>();
       inflightInstants.put(1L, "0001");
       
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent);
-      assertEquals(1, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-      // the cache will only contain 'new_key', others are cleaned.
+      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      // the cache contains 'new_key', and other old locations
       location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
       assertEquals(newLocation, location);
       location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
-      assertNull(location);
+      assertNotNull(location);
+    }
+  }
+
+  @Test
+  void testRecordLevelIndexCacheClean() throws Exception {
+    // set a small value for RLI cache
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
+
+    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
+
+      for (int i = 0; i < 1500; i++) {
+        recordLevelIndexBackend.update("id1_" + i,
+            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
+      }
+      // new checkpoint
+      recordLevelIndexBackend.onCheckpoint(1);
+      Correspondent correspondent = mock(Correspondent.class);
+      Map<Long, String> inflightInstants = new HashMap<>();
+      inflightInstants.put(1L, "0001");
+      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+
+      for (int i = 0; i < 2000; i++) {
+        recordLevelIndexBackend.update("id2_" + i,
+            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
+      }
+
+      // new checkpoint
+      recordLevelIndexBackend.onCheckpoint(2);
+      correspondent = mock(Correspondent.class);
+      inflightInstants = new HashMap<>();
+      inflightInstants.put(2L, "0002");
+      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+      recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
+
+      for (int i = 0; i < 2000; i++) {
+        recordLevelIndexBackend.update("id3_" + i,
+            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
+      }
+
+      // the cache for the first instant is evicted
+      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+
+      // new checkpoint
+      recordLevelIndexBackend.onCheckpoint(3);
+      correspondent = mock(Correspondent.class);
+      inflightInstants = new HashMap<>();
+      inflightInstants.put(3L, "0003");
+      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+      recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
+
+      for (int i = 0; i < 500; i++) {
+        recordLevelIndexBackend.update("id4_" + i,
+            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
+      }
+
+      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+
+      // insert another batch of records, which will trigger the cleaning of 
the cache.
+      // another cache clean is triggered
+      for (int i = 500; i < 1500; i++) {
+        recordLevelIndexBackend.update("id4_" + i,
+            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
+      }
+      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      // cache for the oldest ckp id will be cleaned
+      
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
+      // caches for the latest 3 ckp id still in the cache
+      assertEquals("par1", 
recordLevelIndexBackend.get("id2_0").getPartitionPath());
+      assertEquals("par1", 
recordLevelIndexBackend.get("id3_0").getPartitionPath());
+      assertEquals("par1", 
recordLevelIndexBackend.get("id4_0").getPartitionPath());
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 4c7b051a5ba2..c7ca2ee1b485 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -657,7 +657,7 @@ public class TestWriteBase {
     }
 
     public TestHarness checkLastPendingInstantCompleted() {
-      this.pipeline.checkpointComplete(3);
+      this.pipeline.checkpointComplete(4);
       checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
       this.lastComplete = lastPending;
       this.lastPending = lastPendingInstant();

Reply via email to