This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c819a5dad44 perf: Support lazy clean of the RLI cache during bucket
assigning (#18018)
4c819a5dad44 is described below
commit 4c819a5dad445a5ab60b554aeda8838d89fbcb91
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Feb 5 17:58:56 2026 +0800
perf: Support lazy clean of the RLI cache during bucket assigning (#18018)
* perf: Support lazy clean of the RLI cache during bucket assigning
---------
Co-authored-by: danny0405 <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 7 +-
.../sink/partitioner/BucketAssignFunction.java | 2 +-
.../hudi/sink/partitioner/index/IndexBackend.java | 5 +-
.../sink/partitioner/index/RecordIndexCache.java | 111 ++++++++++++++++----
.../partitioner/index/RecordLevelIndexBackend.java | 11 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 4 +-
.../partitioner/index/TestRecordIndexCache.java | 116 ++++++++++++++++-----
.../index/TestRecordLevelIndexBackend.java | 80 +++++++++++++-
.../org/apache/hudi/sink/utils/TestWriteBase.java | 2 +-
9 files changed, 274 insertions(+), 64 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 2e7c017225ee..5dee4b8b1213 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -292,9 +292,10 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Long> INDEX_RLI_CACHE_SIZE = ConfigOptions
.key("index.rli.cache.size")
.longType()
- .defaultValue(100L) // default 100 MB
- .withDescription("Maximum memory in MB for the inflight record index
cache during one checkpoint interval.\n"
- + "When record level index is used to assign bucket, record
locations will first be cached before the record index is committed.");
+ .defaultValue(256L) // default 256 MB
+ .withDescription("Maximum memory allocated for the record level index
cache per bucket-assign task.\n"
+ + "The memory size of each individual cache within a checkpoint
interval is dynamically calculated based on the \n"
+ + "average memory size of caches for historical checkpoints.");
@AdvancedConfig
public static final ConfigOption<Integer> INDEX_RLI_LOOKUP_MINIBATCH_SIZE =
ConfigOptions
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index f8ab39a92162..ce72c1268b6e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -231,7 +231,7 @@ public class BucketAssignFunction
public void notifyCheckpointComplete(long checkpointId) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reload(checkpointId);
- this.indexBackend.onCheckpointComplete(this.correspondent);
+ this.indexBackend.onCheckpointComplete(this.correspondent, checkpointId);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index 74d98c46ab14..de39e44b6af7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -58,9 +58,10 @@ public interface IndexBackend extends Closeable {
/**
* Listener method called when the bucket assign operator receives a notify
checkpoint complete event.
*
- * @param correspondent The Correspondent used to get inflight instants from
the coordinator.
+ * @param correspondent The Correspondent used to get inflight
instants from the coordinator.
+ * @param completedCheckpointId The latest completed checkpoint id
*/
- default void onCheckpointComplete(Correspondent correspondent) {
+ default void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointId) {
// do nothing.
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
index 267025ba4519..8f78bd70d34f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
@@ -30,11 +30,11 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.FlinkWriteClients;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
import java.util.Comparator;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -44,17 +44,35 @@ import java.util.TreeMap;
* <p>
* todo: use map backed by flink managed memory.
*/
+@Slf4j
public class RecordIndexCache implements Closeable {
@VisibleForTesting
@Getter
private final TreeMap<Long, ExternalSpillableMap<String,
HoodieRecordGlobalLocation>> caches;
private final HoodieWriteConfig writeConfig;
+ @VisibleForTesting
+ @Getter
private final long maxCacheSizeInBytes;
+ // the minimum checkpoint id retained in the cache.
+ private long minRetainedCheckpointId;
+ private long recordCnt = 0;
+
+ /**
+ * Step size to check the total memory size of the cache.
+ */
+ private static final int NUMBER_OF_RECORDS_TO_CHECK_MEMORY_SIZE = 1000;
+
+ /**
+ * Factor for estimating the real size of memory used by a spilled map.
+ */
+ @VisibleForTesting
+ public static final double FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP = 0.8;
public RecordIndexCache(Configuration conf, long initCheckpointId) {
this.caches = new TreeMap<>(Comparator.reverseOrder());
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false,
false);
this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) *
1024 * 1024;
+ this.minRetainedCheckpointId = Integer.MIN_VALUE;
addCheckpointCache(initCheckpointId);
}
@@ -65,14 +83,21 @@ public class RecordIndexCache implements Closeable {
*/
public void addCheckpointCache(long checkpointId) {
try {
- // Create a new ExternalSpillableMap for this checkpoint
+ long inferredCacheSize = inferMemorySizeForCache();
+ // clean the caches to ensure enough memory for the new cache
+ cleanIfNecessary(inferredCacheSize);
+ // create a new map cache for this checkpoint
ExternalSpillableMap<String, HoodieRecordGlobalLocation> newCache =
new ExternalSpillableMap<>(
- maxCacheSizeInBytes,
+ inferredCacheSize,
writeConfig.getSpillableMapBasePath(),
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>(),
- writeConfig.getCommonConfig().getSpillableDiskMapType(),
+ // using ROCKS_DB disk map always. As BITCASK disk map get extra
memory
+ // cost for each key during spilling: key ->
ValueMetadata(filePath, valueSize, position, ts).
+ // so it's redundant to use BITCASK disk map since the map is
used to store
+ // HoodieRecordGlobalLocation which has similar size as
ValueMetadata.
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
new DefaultSerializer<>(),
writeConfig.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED),
"RecordIndexCache-" + checkpointId);
@@ -82,6 +107,33 @@ public class RecordIndexCache implements Closeable {
}
}
+ /**
+ * Infer the size of memory for the cache:
+ * - The memory size for the first spillable map is MAX_MEM / 2
+ * - The memory size for the following checkpoint cache is the average used
memory of all existed caches.
+ * - Clean caches for committed instants if the remaining memory is not
enough for the new cache.
+ */
+ @VisibleForTesting
+ long inferMemorySizeForCache() {
+ if (caches.isEmpty()) {
+ return maxCacheSizeInBytes / 2;
+ }
+
+ long totalUsedMemory = caches.values().stream()
+ .map(
+ // if the cache is spilled, adjust the actual used memory by
multiplying a factor
+ m -> m.getSizeOfFileOnDiskInBytes() > 0
+ ? (long) (m.getCurrentInMemoryMapSize() /
FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP)
+ : m.getCurrentInMemoryMapSize())
+ .reduce(Long::sum).orElse(0L);
+ long avgUsedMemorySize = totalUsedMemory / caches.size();
+
+ if (avgUsedMemorySize <= 0) {
+ avgUsedMemorySize = maxCacheSizeInBytes / 2;
+ }
+ return avgUsedMemorySize;
+ }
+
/**
* Search the record location from caches with larger checkpoint id to that
with smaller checkpoint id,
* return early if the record location is found for the record key, return
null otherwise.
@@ -107,37 +159,50 @@ public class RecordIndexCache implements Closeable {
* @param recordGlobalLocation the record location.
*/
public void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) {
- ValidationUtils.checkArgument(!caches.isEmpty(), "record index cache
should not be empty.");
- // Get the sub cache with the largest checkpoint ID (first entry in the
reverse-ordered TreeMap)
+ ValidationUtils.checkArgument(!caches.isEmpty(), "Record index cache
should not be empty.");
+ // get the cache with the largest checkpoint ID (first entry in the
reverse-ordered TreeMap).
caches.firstEntry().getValue().put(recordKey, recordGlobalLocation);
+
+ if ((++recordCnt) % NUMBER_OF_RECORDS_TO_CHECK_MEMORY_SIZE == 0) {
+ cleanIfNecessary(0L);
+ recordCnt = 0;
+ }
}
/**
- * Clean all the cache entries for checkpoint whose id is less than the
given checkpoint id.
+ * Marks the historical cache entries as evictable.
*
- * @param checkpointId the id of checkpoint
+ * @param checkpointId The minimum retained checkpoint id
*/
- public void clean(long checkpointId) {
- NavigableMap<Long, ExternalSpillableMap<String,
HoodieRecordGlobalLocation>> subMap;
- if (checkpointId == Long.MAX_VALUE) {
- // clean all the cache entries for old checkpoint ids, and only keeps
the cache for the maximum checkpoint id,
- // which aims to clear memory while also ensuring a certain cache hit
rate
- subMap = caches.firstEntry() == null ? Collections.emptyNavigableMap() :
caches.tailMap(caches.firstKey(), false);
- } else {
- subMap = caches.tailMap(checkpointId, false);
+ public void markAsEvictable(long checkpointId) {
+ ValidationUtils.checkArgument(checkpointId >= minRetainedCheckpointId,
+ String.format("The checkpoint id for minium inflight instant should be
increased,"
+ + " ckpIdForMinInflightInstant: %s, received checkpointId: %s",
minRetainedCheckpointId, checkpointId));
+ minRetainedCheckpointId = checkpointId;
+ }
+
+ /**
+ * Performs the actual cleaning to release memory for new caches.
+ *
+ * @param nextCacheSize the size for the next new cache
+ */
+ private void cleanIfNecessary(long nextCacheSize) {
+ while (!caches.isEmpty() && caches.lastKey() < minRetainedCheckpointId
+ && getInMemoryMapSize() + nextCacheSize > this.maxCacheSizeInBytes) {
+ NavigableMap.Entry<Long, ExternalSpillableMap<String,
HoodieRecordGlobalLocation>> lastEntry = caches.pollLastEntry();
+ lastEntry.getValue().close();
+ log.info("Clean record index cache for checkpoint: {}",
lastEntry.getKey());
}
- // Get all entries that are less than or equal to the given checkpointId
- // Close all the ExternalSpillableMap instances before removing them
- subMap.values().forEach(ExternalSpillableMap::close);
- // Remove all the entries from the main cache
- subMap.clear();
+ }
+
+ private long getInMemoryMapSize() {
+ return
caches.values().stream().map(ExternalSpillableMap::getCurrentInMemoryMapSize).reduce(Long::sum).orElse(0L);
}
@Override
public void close() throws IOException {
- // Close all the ExternalSpillableMap instances before removing them
+ // Close all the map instances before removing them
caches.values().forEach(ExternalSpillableMap::close);
- // Close all ExternalSpillableMap instances before clearing the cache
caches.clear();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index 68200bf3ea13..6fdecb0f22c4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -33,6 +33,7 @@ import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import java.io.IOException;
@@ -45,6 +46,7 @@ import java.util.Map;
/**
* An implementation of {@link IndexBackend} based on the record level index
in metadata table.
*/
+@Slf4j
public class RecordLevelIndexBackend implements MinibatchIndexBackend {
@VisibleForTesting
@Getter
@@ -106,9 +108,14 @@ public class RecordLevelIndexBackend implements
MinibatchIndexBackend {
}
@Override
- public void onCheckpointComplete(Correspondent correspondent) {
+ public void onCheckpointComplete(Correspondent correspondent, long
completedCheckpointID) {
Map<Long, String> inflightInstants =
correspondent.requestInflightInstants();
-
recordIndexCache.clean(inflightInstants.keySet().stream().min(Long::compareTo).orElse(Long.MAX_VALUE));
+ log.info("Inflight instants and the corresponding checkpoints: {},
notified completed checkpoints: {}",
+ inflightInstants, completedCheckpointID);
+ // if there are no inflight instants,
+ // the latest completed checkpoint id is used as the minimum checkpoint id,
+ // since the streaming write operator always uses previous checkpoint id
to request the new instant.
+
recordIndexCache.markAsEvictable(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointID));
this.metaClient.reloadActiveTimeline();
reloadMetadataTable();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 60560b7cb3a6..9bf56a9dc4d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -924,8 +924,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.assertInflightCachesOfBucketAssigner(2)
.assertNextEvent(4, "par1,par2,par3,par4")
.checkpointComplete(1)
- // clean the first inflight cache, left the latest inflight cache.
- .assertInflightCachesOfBucketAssigner(1)
+ // the first inflight cache will not be cleaned, since the current
total memory size does not exceed the limit.
+ .assertInflightCachesOfBucketAssigner(2)
.checkWrittenData(EXPECTED1);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
index 0836f97d2926..d490a9c6ab96 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordIndexCache.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
/**
* Test cases for {@link RecordIndexCache}.
@@ -57,7 +59,7 @@ public class TestRecordIndexCache {
}
@AfterEach
- void clean() throws IOException {
+ void markAsEvictable() throws IOException {
this.cache.close();
}
@@ -150,37 +152,46 @@ public class TestRecordIndexCache {
}
@Test
- void testClean() {
- cache.addCheckpointCache(2L);
- cache.addCheckpointCache(3L);
- cache.addCheckpointCache(4L);
-
- String recordKey1 = "key1";
- String recordKey2 = "key2";
+ void testMarkAsEvictable() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempDir.getAbsolutePath());
+ conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L); // 100MB cache size
+ cache = new RecordIndexCache(conf, 1L);
+
HoodieRecordGlobalLocation location1 = new
HoodieRecordGlobalLocation("partition1", "1001", "file_id1");
HoodieRecordGlobalLocation location2 = new
HoodieRecordGlobalLocation("partition2", "1002", "file_id2");
-
// Add records to different checkpoints
- cache.getCaches().get(1L).put(recordKey1, location1);
- cache.getCaches().get(2L).put(recordKey2, location2);
-
+ for (int i = 0; i < 5000; i++) {
+ cache.update("k1_" + i, location1);
+ }
+ cache.addCheckpointCache(2L);
+ for (int i = 0; i < 5000; i++) {
+ cache.update("k2_" + i, location2);
+ }
+
// Verify records exist before cleaning
- assertNotNull(cache.get(recordKey1));
- assertNotNull(cache.get(recordKey2));
-
- // Clean checkpoints up to and including 2
- cache.clean(3L);
-
- // Check that checkpoints 1 and 2 are removed
- assertEquals(2, cache.getCaches().size()); // Should have checkpoints 3
and 4
+ assertNotNull(cache.get("k1_0"));
+ assertNotNull(cache.get("k2_0"));
+
+ cache.addCheckpointCache(3L);
+ for (int i = 0; i < 800; i++) {
+ cache.update("k3_" + i, location2);
+ }
+
+ cache.addCheckpointCache(4L);
+ // mark 1,2 as cleanable
+ cache.markAsEvictable(3L);
+ // write another batch of records to trigger cleaning
+ for (int i = 0; i < 800; i++) {
+ cache.update("k4_" + i, location2);
+ }
+
+ // Check that checkpoint 1 are removed
+ assertEquals(3, cache.getCaches().size()); // Should have checkpoints 2, 3
and 4
+
assertFalse(cache.getCaches().containsKey(1L));
- assertFalse(cache.getCaches().containsKey(2L));
+ assertTrue(cache.getCaches().containsKey(2L));
assertTrue(cache.getCaches().containsKey(3L));
assertTrue(cache.getCaches().containsKey(4L));
-
- // Records from cleaned checkpoints should no longer be accessible
- assertNull(cache.get(recordKey1));
- assertNull(cache.get(recordKey2));
}
@Test
@@ -280,4 +291,57 @@ public class TestRecordIndexCache {
}
}
}
-}
\ No newline at end of file
+
+ @Test
+ void testInferMemorySizeForCacheWithEmptyCaches() {
+ cache.getCaches().clear();
+
+ // infer the cache for the first checkpoint
+ long inferredSize = cache.inferMemorySizeForCache();
+ assertEquals(cache.getMaxCacheSizeInBytes() / 2, inferredSize);
+
+ // cache for the first checkpoint is empty
+ ExternalSpillableMap<String, HoodieRecordGlobalLocation> map1 =
Mockito.mock(ExternalSpillableMap.class);
+ when(map1.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+ when(map1.getCurrentInMemoryMapSize()).thenReturn(0L);
+ cache.getCaches().put(1L, map1);
+
+ inferredSize = cache.inferMemorySizeForCache();
+ assertEquals(cache.getMaxCacheSizeInBytes() / 2, inferredSize);
+ }
+
+ @Test
+ void testInferMemorySizeForCacheUsesAverageInMemorySize() {
+ cache.getCaches().clear();
+
+ ExternalSpillableMap<String, HoodieRecordGlobalLocation> map1 =
Mockito.mock(ExternalSpillableMap.class);
+ ExternalSpillableMap<String, HoodieRecordGlobalLocation> map2 =
Mockito.mock(ExternalSpillableMap.class);
+ when(map1.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+ when(map2.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+ when(map1.getCurrentInMemoryMapSize()).thenReturn(100L);
+ when(map2.getCurrentInMemoryMapSize()).thenReturn(300L);
+
+ cache.getCaches().put(1L, map1);
+ cache.getCaches().put(2L, map2);
+
+ assertEquals(200L, cache.inferMemorySizeForCache());
+ }
+
+ @Test
+ void testInferMemorySizeForCacheAdjustsForSpilledMaps() {
+ cache.getCaches().clear();
+
+ ExternalSpillableMap<String, HoodieRecordGlobalLocation> spilledMap =
Mockito.mock(ExternalSpillableMap.class);
+ ExternalSpillableMap<String, HoodieRecordGlobalLocation> inMemoryMap =
Mockito.mock(ExternalSpillableMap.class);
+ when(spilledMap.getSizeOfFileOnDiskInBytes()).thenReturn(200L);
+ when(spilledMap.getCurrentInMemoryMapSize()).thenReturn(800L);
+ when(inMemoryMap.getSizeOfFileOnDiskInBytes()).thenReturn(0L);
+ when(inMemoryMap.getCurrentInMemoryMapSize()).thenReturn(500L);
+
+ cache.getCaches().put(2L, spilledMap);
+ cache.getCaches().put(1L, inMemoryMap);
+
+ long expected = (long) ((800L /
RecordIndexCache.FACTOR_FOR_MEMORY_SIZE_OF_SPILLED_MAP) + 500L) / 2;
+ assertEquals(expected, cache.inferMemorySizeForCache());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 49d4370266e8..70011b514ee7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -106,13 +107,84 @@ public class TestRecordLevelIndexBackend {
Map<Long, String> inflightInstants = new HashMap<>();
inflightInstants.put(1L, "0001");
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
- recordLevelIndexBackend.onCheckpointComplete(correspondent);
- assertEquals(1,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
- // the cache will only contain 'new_key', others are cleaned.
+ recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+ assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ // the cache contains 'new_key', and other old locations
location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
assertEquals(newLocation, location);
location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
- assertNull(location);
+ assertNotNull(location);
+ }
+ }
+
+ @Test
+ void testRecordLevelIndexCacheClean() throws Exception {
+ // set a small value for RLI cache
+ conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
+
+ try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
+
+ for (int i = 0; i < 1500; i++) {
+ recordLevelIndexBackend.update("id1_" + i,
+ new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
+ }
+ // new checkpoint
+ recordLevelIndexBackend.onCheckpoint(1);
+ Correspondent correspondent = mock(Correspondent.class);
+ Map<Long, String> inflightInstants = new HashMap<>();
+ inflightInstants.put(1L, "0001");
+
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+ recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+
+ for (int i = 0; i < 2000; i++) {
+ recordLevelIndexBackend.update("id2_" + i,
+ new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
+ }
+
+ // new checkpoint
+ recordLevelIndexBackend.onCheckpoint(2);
+ correspondent = mock(Correspondent.class);
+ inflightInstants = new HashMap<>();
+ inflightInstants.put(2L, "0002");
+
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+ recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
+
+ for (int i = 0; i < 2000; i++) {
+ recordLevelIndexBackend.update("id3_" + i,
+ new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
+ }
+
+ // the cache for the first instant is evicted
+ assertEquals(2,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+
+ // new checkpoint
+ recordLevelIndexBackend.onCheckpoint(3);
+ correspondent = mock(Correspondent.class);
+ inflightInstants = new HashMap<>();
+ inflightInstants.put(3L, "0003");
+
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+ recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
+
+ for (int i = 0; i < 500; i++) {
+ recordLevelIndexBackend.update("id4_" + i,
+ new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
+ }
+
+ assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+
+ // insert another batch of records, which will trigger the cleaning of
the cache.
+ // another cache clean is triggered
+ for (int i = 500; i < 1500; i++) {
+ recordLevelIndexBackend.update("id4_" + i,
+ new HoodieRecordGlobalLocation("par1", "000000001",
UUID.randomUUID().toString(), -1));
+ }
+ assertEquals(3,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ // cache for the oldest ckp id will be cleaned
+
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
+ // caches for the latest 3 ckp id still in the cache
+ assertEquals("par1",
recordLevelIndexBackend.get("id2_0").getPartitionPath());
+ assertEquals("par1",
recordLevelIndexBackend.get("id3_0").getPartitionPath());
+ assertEquals("par1",
recordLevelIndexBackend.get("id4_0").getPartitionPath());
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 4c7b051a5ba2..c7ca2ee1b485 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -657,7 +657,7 @@ public class TestWriteBase {
}
public TestHarness checkLastPendingInstantCompleted() {
- this.pipeline.checkpointComplete(3);
+ this.pipeline.checkpointComplete(4);
checkInstantState(HoodieInstant.State.COMPLETED, this.lastPending);
this.lastComplete = lastPending;
this.lastPending = lastPendingInstant();