This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 03a152a7e6 [#8371] improvement(core): Add cache for Lance (#8450)
03a152a7e6 is described below

commit 03a152a7e63a7c62c9f8f4efab8faf23326a5c37
Author: roryqi <[email protected]>
AuthorDate: Fri Sep 12 15:28:39 2025 +0800

    [#8371] improvement(core): Add cache for Lance (#8450)
    
    ### What changes were proposed in this pull request?
    
    Add cache for Lance
    
    ### Why are the changes needed?
    
    Fix: #8371
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, I added the documents.
    
    ### How was this patch tested?
    
    Add UT.
---
 .../storage/LancePartitionStatisticStorage.java    | 405 ++++++++++++++-------
 .../TestLancePartitionStatisticStorage.java        | 175 ++++++++-
 docs/manage-statistics-in-gravitino.md             |  19 +-
 gradle/libs.versions.toml                          |   2 +-
 4 files changed, 452 insertions(+), 149 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
index 416c183f45..9160021d0b 100644
--- 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -19,24 +19,36 @@
 package org.apache.gravitino.stats.storage;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lancedb.lance.Dataset;
 import com.lancedb.lance.Fragment;
 import com.lancedb.lance.FragmentMetadata;
-import com.lancedb.lance.FragmentOperation;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
 import com.lancedb.lance.WriteParams;
 import com.lancedb.lance.ipc.LanceScanner;
 import com.lancedb.lance.ipc.ScanOptions;
+import com.lancedb.lance.operation.Append;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
@@ -79,6 +91,12 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
   private static final int DEFAULT_MAX_ROWS_PER_GROUP = 1000000; // 1M
   private static final String READ_BATCH_SIZE = "readBatchSize";
   private static final int DEFAULT_READ_BATCH_SIZE = 10000; // 10K
+  private static final String DATASET_CACHE_SIZE = "datasetCacheSize";
+  private static final int DEFAULT_DATASET_CACHE_SIZE = 0;
+  private static final String METADATA_FILE_CACHE_SIZE = 
"metadataFileCacheSizeBytes";
+  private static final long DEFAULT_METADATA_FILE_CACHE_SIZE = 100L * 1024; // 
100KB
+  private static final String INDEX_CACHE_SIZE = "indexCacheSizeBytes";
+  private static final long DEFAULT_INDEX_CACHE_SIZE = 100L * 1024; // 100KB
   // The schema is `table_id`, `partition_name`,  `statistic_name`, 
`statistic_value`, `audit_info`
   private static final String TABLE_ID_COLUMN = "table_id";
   private static final String PARTITION_NAME_COLUMN = "partition_name";
@@ -86,6 +104,8 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
   private static final String STATISTIC_VALUE_COLUMN = "statistic_value";
   private static final String AUDIT_INFO_COLUMN = "audit_info";
 
+  private final Optional<Cache<Long, Dataset>> datasetCache;
+
   private static final Schema SCHEMA =
       new Schema(
           Arrays.asList(
@@ -102,6 +122,8 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
   private final int maxBytesPerFile;
   private final int maxRowsPerGroup;
   private final int readBatchSize;
+  private final long metadataFileCacheSize;
+  private final long indexCacheSize;
 
   private final EntityStore entityStore = 
GravitinoEnv.getInstance().entityStore();
 
@@ -133,7 +155,50 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
             properties.getOrDefault(READ_BATCH_SIZE, 
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
     Preconditions.checkArgument(
         readBatchSize > 0, "Lance partition statistics storage readBatchSize 
must be positive");
+    int datasetCacheSize =
+        Integer.parseInt(
+            properties.getOrDefault(
+                DATASET_CACHE_SIZE, 
String.valueOf(DEFAULT_DATASET_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        datasetCacheSize >= 0,
+        "Lance partition statistics storage datasetCacheSize must be greater 
than or equal to 0");
+    this.metadataFileCacheSize =
+        Long.parseLong(
+            properties.getOrDefault(
+                METADATA_FILE_CACHE_SIZE, 
String.valueOf(DEFAULT_METADATA_FILE_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        metadataFileCacheSize > 0,
+        "Lance partition statistics storage metadataFileCacheSizeBytes must be 
positive");
+    this.indexCacheSize =
+        Long.parseLong(
+            properties.getOrDefault(INDEX_CACHE_SIZE, 
String.valueOf(DEFAULT_INDEX_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        indexCacheSize > 0,
+        "Lance partition statistics storage indexCacheSizeBytes must be 
positive");
+
     this.properties = properties;
+    if (datasetCacheSize != 0) {
+      this.datasetCache =
+          Optional.of(
+              Caffeine.newBuilder()
+                  .maximumSize(datasetCacheSize)
+                  .scheduler(
+                      Scheduler.forScheduledExecutorService(
+                          new ScheduledThreadPoolExecutor(
+                              1,
+                              newDaemonThreadFactory(
+                                  
"lance-partition-statistic-storage-cache-cleaner"))))
+                  .evictionListener(
+                      (RemovalListener<Long, Dataset>)
+                          (key, value, cause) -> {
+                            if (value != null) {
+                              value.close();
+                            }
+                          })
+                  .build());
+    } else {
+      datasetCache = Optional.empty();
+    }
   }
 
   @Override
@@ -197,95 +262,39 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
     }
   }
 
-  private void appendStatisticsImpl(Long tableId, 
List<PartitionStatisticsUpdate> updates) {
-    String fileName = getFilePath(tableId);
-    try (Dataset datasetRead = open(fileName)) {
-      List<FragmentMetadata> fragmentMetas;
-      int count = 0;
-      try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) 
{
-        for (PartitionStatisticsUpdate update : updates) {
-          count += update.statistics().size();
+  private void appendStatisticsImpl(Long tableId, 
List<PartitionStatisticsUpdate> updates)
+      throws JsonProcessingException {
+    Dataset datasetRead = null;
+    Dataset newDataset = null;
+    try {
+      datasetRead = getDataset(tableId);
+      List<FragmentMetadata> fragmentMetas = createFragmentMetadata(tableId, 
updates);
+
+      Transaction appendTxn =
+          datasetRead
+              .newTransactionBuilder()
+              .operation(Append.builder().fragments(fragmentMetas).build())
+              .transactionProperties(Collections.emptyMap())
+              .build();
+      newDataset = appendTxn.commit();
+
+      Dataset finalNewDataset = newDataset;
+      datasetCache.ifPresent(cache -> cache.put(tableId, finalNewDataset));
+    } finally {
+      if (!datasetCache.isPresent()) {
+        if (datasetRead != null) {
+          datasetRead.close();
         }
-
-        for (FieldVector vector : root.getFieldVectors()) {
-          vector.setInitialCapacity(count);
+        if (newDataset != null) {
+          newDataset.close();
         }
-        root.allocateNew();
-        int index = 0;
-
-        for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
-          String partitionName = updatePartitionStatistic.partitionName();
-          for (Map.Entry<String, StatisticValue<?>> statistic :
-              updatePartitionStatistic.statistics().entrySet()) {
-            String statisticName = statistic.getKey();
-            String statisticValue =
-                
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
-
-            UInt8Vector tableIdVector = (UInt8Vector) 
root.getVector(TABLE_ID_COLUMN);
-            VarCharVector partitionNameVector =
-                (VarCharVector) root.getVector(PARTITION_NAME_COLUMN);
-            VarCharVector statisticNameVector =
-                (VarCharVector) root.getVector(STATISTIC_NAME_COLUMN);
-            LargeVarCharVector statisticValueVector =
-                (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
-            VarCharVector auditInfoVector = (VarCharVector) 
root.getVector(AUDIT_INFO_COLUMN);
-
-            tableIdVector.set(index, tableId);
-            partitionNameVector.setSafe(index, 
partitionName.getBytes(StandardCharsets.UTF_8));
-            statisticNameVector.setSafe(index, 
statisticName.getBytes(StandardCharsets.UTF_8));
-            statisticValueVector.setSafe(index, 
statisticValue.getBytes(StandardCharsets.UTF_8));
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(PrincipalUtils.getCurrentUserName())
-                    .withCreateTime(Instant.now())
-                    .withLastModifier(PrincipalUtils.getCurrentUserName())
-                    .withLastModifiedTime(Instant.now())
-                    .build();
-            auditInfoVector.setSafe(
-                index,
-                JsonUtils.anyFieldMapper()
-                    .writeValueAsString(auditInfo)
-                    .getBytes(StandardCharsets.UTF_8));
-
-            index++;
-          }
-        }
-
-        root.setRowCount(index);
-
-        fragmentMetas =
-            Fragment.create(
-                getFilePath(tableId),
-                allocator,
-                root,
-                new WriteParams.Builder()
-                    .withMaxRowsPerFile(maxRowsPerFile)
-                    .withMaxBytesPerFile(maxBytesPerFile)
-                    .withMaxRowsPerGroup(maxRowsPerGroup)
-                    .withStorageOptions(properties)
-                    .build());
-        FragmentOperation.Append appendOp = new 
FragmentOperation.Append(fragmentMetas);
-        Dataset.commit(
-                allocator,
-                getFilePath(tableId),
-                appendOp,
-                Optional.of(datasetRead.version()),
-                properties)
-            .close();
       }
-
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException("Failed to serialize statistic value", e);
     }
   }
 
-  private String getFilePath(Long tableId) {
-    return location + "/" + tableId + ".lance";
-  }
-
   private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop> 
drops) {
-    String fileName = getFilePath(tableId);
-    try (Dataset dataset = open(fileName)) {
+    Dataset dataset = getDataset(tableId);
+    try {
       List<String> partitionSQLs = Lists.newArrayList();
       for (PartitionStatisticsDrop drop : drops) {
         List<String> statistics = drop.statisticNames();
@@ -307,6 +316,10 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
             partitionSQLs.stream().map(str -> "(" + str + 
")").collect(Collectors.joining(" OR "));
         dataset.delete(filterSQL);
       }
+    } finally {
+      if (!datasetCache.isPresent() && dataset != null) {
+        dataset.close();
+      }
     }
   }
 
@@ -315,6 +328,85 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
     if (allocator != null) {
       allocator.close();
     }
+
+    datasetCache.ifPresent(Cache::invalidateAll);
+  }
+
+  @VisibleForTesting
+  Cache<Long, Dataset> getDatasetCache() {
+    return datasetCache.orElse(null);
+  }
+
+  private String getFilePath(Long tableId) {
+    return location + "/" + tableId + ".lance";
+  }
+
+  private List<FragmentMetadata> createFragmentMetadata(
+      Long tableId, List<PartitionStatisticsUpdate> updates) throws 
JsonProcessingException {
+    List<FragmentMetadata> fragmentMetas;
+    int count = 0;
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) {
+      for (PartitionStatisticsUpdate update : updates) {
+        count += update.statistics().size();
+      }
+
+      for (FieldVector vector : root.getFieldVectors()) {
+        vector.setInitialCapacity(count);
+      }
+      root.allocateNew();
+      int index = 0;
+
+      UInt8Vector tableIdVector = (UInt8Vector) 
root.getVector(TABLE_ID_COLUMN);
+      VarCharVector partitionNameVector = (VarCharVector) 
root.getVector(PARTITION_NAME_COLUMN);
+      VarCharVector statisticNameVector = (VarCharVector) 
root.getVector(STATISTIC_NAME_COLUMN);
+      LargeVarCharVector statisticValueVector =
+          (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
+      VarCharVector auditInfoVector = (VarCharVector) 
root.getVector(AUDIT_INFO_COLUMN);
+
+      for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
+        String partitionName = updatePartitionStatistic.partitionName();
+        for (Map.Entry<String, StatisticValue<?>> statistic :
+            updatePartitionStatistic.statistics().entrySet()) {
+          String statisticName = statistic.getKey();
+          String statisticValue =
+              
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
+
+          tableIdVector.set(index, tableId);
+          partitionNameVector.setSafe(index, 
partitionName.getBytes(StandardCharsets.UTF_8));
+          statisticNameVector.setSafe(index, 
statisticName.getBytes(StandardCharsets.UTF_8));
+          statisticValueVector.setSafe(index, 
statisticValue.getBytes(StandardCharsets.UTF_8));
+          AuditInfo auditInfo =
+              AuditInfo.builder()
+                  .withCreator(PrincipalUtils.getCurrentUserName())
+                  .withCreateTime(Instant.now())
+                  .withLastModifier(PrincipalUtils.getCurrentUserName())
+                  .withLastModifiedTime(Instant.now())
+                  .build();
+          auditInfoVector.setSafe(
+              index,
+              JsonUtils.anyFieldMapper()
+                  .writeValueAsString(auditInfo)
+                  .getBytes(StandardCharsets.UTF_8));
+
+          index++;
+        }
+      }
+
+      root.setRowCount(index);
+
+      fragmentMetas =
+          Fragment.create(
+              getFilePath(tableId),
+              allocator,
+              root,
+              new WriteParams.Builder()
+                  .withMaxRowsPerFile(maxRowsPerFile)
+                  .withMaxBytesPerFile(maxBytesPerFile)
+                  .withMaxRowsPerGroup(maxRowsPerGroup)
+                  .withStorageOptions(properties)
+                  .build());
+      return fragmentMetas;
+    }
   }
 
   private static String getPartitionFilter(PartitionRange range) {
@@ -354,70 +446,101 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
 
   private List<PersistedPartitionStatistics> listStatisticsImpl(
       Long tableId, String partitionFilter) {
-    String fileName = getFilePath(tableId);
-
-    try (Dataset dataset = open(fileName)) {
-
-      String filter = "table_id = " + tableId + partitionFilter;
-
-      try (LanceScanner scanner =
-          dataset.newScan(
-              new ScanOptions.Builder()
-                  .columns(
-                      Arrays.asList(
-                          TABLE_ID_COLUMN,
-                          PARTITION_NAME_COLUMN,
-                          STATISTIC_NAME_COLUMN,
-                          STATISTIC_VALUE_COLUMN,
-                          AUDIT_INFO_COLUMN))
-                  .withRowId(true)
-                  .batchSize(readBatchSize)
-                  .filter(filter)
-                  .build())) {
-        Map<String, List<PersistedStatistic>> partitionStatistics = 
Maps.newConcurrentMap();
-        try (ArrowReader reader = scanner.scanBatches()) {
-          while (reader.loadNextBatch()) {
-            VectorSchemaRoot root = reader.getVectorSchemaRoot();
-            List<FieldVector> fieldVectors = root.getFieldVectors();
-            VarCharVector partitionNameVector = (VarCharVector) 
fieldVectors.get(1);
-            VarCharVector statisticNameVector = (VarCharVector) 
fieldVectors.get(2);
-            LargeVarCharVector statisticValueVector = (LargeVarCharVector) 
fieldVectors.get(3);
-            VarCharVector auditInfoNameVector = (VarCharVector) 
fieldVectors.get(4);
-
-            for (int i = 0; i < root.getRowCount(); i++) {
-              String partitionName = new String(partitionNameVector.get(i), 
StandardCharsets.UTF_8);
-              String statisticName = new String(statisticNameVector.get(i), 
StandardCharsets.UTF_8);
-              String statisticValueStr =
-                  new String(statisticValueVector.get(i), 
StandardCharsets.UTF_8);
-              String auditInoStr = new String(auditInfoNameVector.get(i), 
StandardCharsets.UTF_8);
-
-              StatisticValue<?> statisticValue =
-                  JsonUtils.anyFieldMapper().readValue(statisticValueStr, 
StatisticValue.class);
-              AuditInfo auditInfo =
-                  JsonUtils.anyFieldMapper().readValue(auditInoStr, 
AuditInfo.class);
-
-              PersistedStatistic persistedStatistic =
-                  PersistedStatistic.of(statisticName, statisticValue, 
auditInfo);
-
-              partitionStatistics
-                  .computeIfAbsent(partitionName, k -> Lists.newArrayList())
-                  .add(persistedStatistic);
-            }
-          }
 
-          return partitionStatistics.entrySet().stream()
-              .map(entry -> PersistedPartitionStatistics.of(entry.getKey(), 
entry.getValue()))
-              .collect(Collectors.toList());
+    Dataset dataset = getDataset(tableId);
+
+    String filter = "table_id = " + tableId + partitionFilter;
+
+    try (LanceScanner scanner =
+        dataset.newScan(
+            new ScanOptions.Builder()
+                .columns(
+                    Arrays.asList(
+                        TABLE_ID_COLUMN,
+                        PARTITION_NAME_COLUMN,
+                        STATISTIC_NAME_COLUMN,
+                        STATISTIC_VALUE_COLUMN,
+                        AUDIT_INFO_COLUMN))
+                .withRowId(true)
+                .batchSize(readBatchSize)
+                .filter(filter)
+                .build())) {
+      Map<String, List<PersistedStatistic>> partitionStatistics = 
Maps.newConcurrentMap();
+      try (ArrowReader reader = scanner.scanBatches()) {
+        while (reader.loadNextBatch()) {
+          VectorSchemaRoot root = reader.getVectorSchemaRoot();
+          List<FieldVector> fieldVectors = root.getFieldVectors();
+          VarCharVector partitionNameVector = (VarCharVector) 
fieldVectors.get(1);
+          VarCharVector statisticNameVector = (VarCharVector) 
fieldVectors.get(2);
+          LargeVarCharVector statisticValueVector = (LargeVarCharVector) 
fieldVectors.get(3);
+          VarCharVector auditInfoNameVector = (VarCharVector) 
fieldVectors.get(4);
+
+          for (int i = 0; i < root.getRowCount(); i++) {
+            String partitionName = new String(partitionNameVector.get(i), 
StandardCharsets.UTF_8);
+            String statisticName = new String(statisticNameVector.get(i), 
StandardCharsets.UTF_8);
+            String statisticValueStr =
+                new String(statisticValueVector.get(i), 
StandardCharsets.UTF_8);
+            String auditInoStr = new String(auditInfoNameVector.get(i), 
StandardCharsets.UTF_8);
+
+            StatisticValue<?> statisticValue =
+                JsonUtils.anyFieldMapper().readValue(statisticValueStr, 
StatisticValue.class);
+            AuditInfo auditInfo =
+                JsonUtils.anyFieldMapper().readValue(auditInoStr, 
AuditInfo.class);
+
+            PersistedStatistic persistedStatistic =
+                PersistedStatistic.of(statisticName, statisticValue, 
auditInfo);
+
+            partitionStatistics
+                .computeIfAbsent(partitionName, k -> Lists.newArrayList())
+                .add(persistedStatistic);
+          }
         }
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+
+        return partitionStatistics.entrySet().stream()
+            .map(entry -> PersistedPartitionStatistics.of(entry.getKey(), 
entry.getValue()))
+            .collect(Collectors.toList());
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (!datasetCache.isPresent() && dataset != null) {
+        dataset.close();
       }
     }
   }
 
+  private Dataset getDataset(Long tableId) {
+    AtomicBoolean newlyCreated = new AtomicBoolean(false);
+    return datasetCache
+        .map(
+            cache -> {
+              Dataset cachedDataset =
+                  cache.get(
+                      tableId,
+                      id -> {
+                        newlyCreated.set(true);
+                        return open(getFilePath(id));
+                      });
+
+              // Ensure dataset uses the latest version
+              if (!newlyCreated.get()) {
+                cachedDataset.checkoutLatest();
+              }
+
+              return cachedDataset;
+            })
+        .orElse(open(getFilePath(tableId)));
+  }
+
   private Dataset open(String fileName) {
     try {
-      return Dataset.open(fileName, allocator);
+      return Dataset.open(
+          allocator,
+          fileName,
+          new ReadOptions.Builder()
+              .setMetadataCacheSizeBytes(metadataFileCacheSize)
+              .setIndexCacheSizeBytes(indexCacheSize)
+              .build());
     } catch (IllegalArgumentException illegalArgumentException) {
       if (illegalArgumentException.getMessage().contains("was not found")) {
         return Dataset.create(allocator, fileName, SCHEMA, new 
WriteParams.Builder().build());
@@ -426,4 +549,8 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
       }
     }
   }
+
+  private ThreadFactory newDaemonThreadFactory(String name) {
+    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + 
"-%d").build();
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
index 3744b1dc61..3ab553c32e 100644
--- 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -66,9 +66,10 @@ public class TestLancePartitionStatisticStorage {
 
     String location = "/tmp/test";
     Map<String, String> properties = Maps.newHashMap();
-    properties.put("lance.location", location);
+    properties.put("location", location);
 
-    PartitionStatisticStorage storage = factory.create(properties);
+    LancePartitionStatisticStorage storage =
+        (LancePartitionStatisticStorage) factory.create(properties);
 
     int count = 100;
     int partitions = 10;
@@ -206,6 +207,176 @@ public class TestLancePartitionStatisticStorage {
     storage.close();
   }
 
+  @Test
+  public void testLancePartitionStatisticStorageWithCache() throws Exception {
+    PartitionStatisticStorageFactory factory = new 
LancePartitionStatisticStorageFactory();
+
+    // Prepare table entity
+    String metalakeName = "metalake";
+    String catalogName = "catalog";
+    String schemaName = "schema";
+    String tableName = "table";
+
+    MetadataObject metadataObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogName, schemaName, tableName), 
MetadataObject.Type.TABLE);
+
+    EntityStore entityStore = mock(EntityStore.class);
+    TableEntity tableEntity = mock(TableEntity.class);
+    when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+    when(tableEntity.id()).thenReturn(1L);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
+
+    String location = "/tmp/test";
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("location", location);
+    properties.put("datasetCacheSize", "1000");
+
+    LancePartitionStatisticStorage storage =
+        (LancePartitionStatisticStorage) factory.create(properties);
+
+    int count = 100;
+    int partitions = 10;
+    Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
originData =
+        generateData(metadataObject, count, partitions);
+    Map<MetadataObject, List<PartitionStatisticsUpdate>> statisticsToUpdate =
+        convertData(originData);
+
+    List<MetadataObjectStatisticsUpdate> objectUpdates = Lists.newArrayList();
+    for (Map.Entry<MetadataObject, List<PartitionStatisticsUpdate>> entry :
+        statisticsToUpdate.entrySet()) {
+      MetadataObject metadata = entry.getKey();
+      List<PartitionStatisticsUpdate> updates = entry.getValue();
+      objectUpdates.add(MetadataObjectStatisticsUpdate.of(metadata, updates));
+    }
+    storage.updateStatistics(metalakeName, objectUpdates);
+    Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+    String fromPartitionName =
+        "partition" + String.format("%0" + String.valueOf(partitions).length() 
+ "d", 0);
+    String toPartitionName =
+        "partition" + String.format("%0" + String.valueOf(partitions).length() 
+ "d", 1);
+
+    List<PersistedPartitionStatistics> listedStats =
+        storage.listStatistics(
+            metalakeName,
+            metadataObject,
+            PartitionRange.between(
+                fromPartitionName,
+                PartitionRange.BoundType.CLOSED,
+                toPartitionName,
+                PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, listedStats.size());
+    Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+    String targetPartitionName = "partition00";
+    for (PersistedPartitionStatistics persistStat : listedStats) {
+      String partitionName = persistStat.partitionName();
+      List<PersistedStatistic> stats = persistStat.statistics();
+      Assertions.assertEquals(targetPartitionName, partitionName);
+      Assertions.assertEquals(10, stats.size());
+
+      for (PersistedStatistic statistic : stats) {
+        String statisticName = statistic.name();
+        StatisticValue<?> statisticValue = statistic.value();
+
+        Assertions.assertTrue(
+            
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+        Assertions.assertEquals(
+            
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+            statisticValue.value());
+        Assertions.assertNotNull(statistic.auditInfo());
+      }
+    }
+
+    // Drop one statistic from partition00
+    List<MetadataObjectStatisticsDrop> tableStatisticsToDrop =
+        Lists.newArrayList(
+            MetadataObjectStatisticsDrop.of(
+                metadataObject,
+                Lists.newArrayList(
+                    PartitionStatisticsModification.drop(
+                        targetPartitionName, 
Lists.newArrayList("statistic0")))));
+
+    storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+    Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+    listedStats =
+        storage.listStatistics(
+            metalakeName,
+            metadataObject,
+            PartitionRange.between(
+                fromPartitionName,
+                PartitionRange.BoundType.CLOSED,
+                toPartitionName,
+                PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, listedStats.size());
+    Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+    for (PersistedPartitionStatistics partitionStat : listedStats) {
+      String partitionName = partitionStat.partitionName();
+      List<PersistedStatistic> stats = partitionStat.statistics();
+      Assertions.assertEquals(targetPartitionName, partitionName);
+      Assertions.assertEquals(9, stats.size());
+
+      for (PersistedStatistic statistic : stats) {
+        String statisticName = statistic.name();
+        StatisticValue<?> statisticValue = statistic.value();
+
+        Assertions.assertTrue(
+            
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+        Assertions.assertEquals(
+            
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+            statisticValue.value());
+        Assertions.assertNotNull(statistic.auditInfo());
+      }
+
+      // Drop one statistics from partition01 and partition02
+      tableStatisticsToDrop =
+          Lists.newArrayList(
+              MetadataObjectStatisticsDrop.of(
+                  metadataObject,
+                  Lists.newArrayList(
+                      PartitionStatisticsModification.drop(
+                          "partition01", Lists.newArrayList("statistic1")),
+                      PartitionStatisticsModification.drop(
+                          "partition02", Lists.newArrayList("statistic2")))));
+      storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+      Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+      listedStats =
+          storage.listStatistics(
+              metalakeName,
+              metadataObject,
+              PartitionRange.between(
+                  fromPartitionName,
+                  PartitionRange.BoundType.CLOSED,
+                  "partition03",
+                  PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(3, listedStats.size());
+      Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+      for (PersistedPartitionStatistics persistPartStat : listedStats) {
+        stats = persistPartStat.statistics();
+        Assertions.assertEquals(9, stats.size());
+        for (PersistedStatistic statistic : stats) {
+          partitionName = persistPartStat.partitionName();
+          String statisticName = statistic.name();
+          StatisticValue<?> statisticValue = statistic.value();
+
+          Assertions.assertTrue(
+              
originData.get(metadataObject).get(partitionName).containsKey(statisticName));
+          Assertions.assertEquals(
+              
originData.get(metadataObject).get(partitionName).get(statisticName).value(),
+              statisticValue.value());
+          Assertions.assertNotNull(statistic.auditInfo());
+        }
+      }
+    }
+
+    FileUtils.deleteDirectory(new File(location + "/" + tableEntity.id() + 
".lance"));
+    storage.close();
+  }
+
   private Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
generateData(
       MetadataObject metadataObject, int count, int partitions) {
     Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
statisticsToUpdate =
diff --git a/docs/manage-statistics-in-gravitino.md 
b/docs/manage-statistics-in-gravitino.md
index 4902726a06..45e80829a4 100644
--- a/docs/manage-statistics-in-gravitino.md
+++ b/docs/manage-statistics-in-gravitino.md
@@ -245,13 +245,18 @@ For example, if you set an extra property `foo` to `bar` 
for Lance storage optio
 For Lance remote storage, you can refer to the document 
[here](https://lancedb.github.io/lance/usage/storage/).
 
 
-| Configuration item                                        | Description      
                    | Default value                  | Required                 
                       | Since version |
-|-----------------------------------------------------------|--------------------------------------|--------------------------------|-------------------------------------------------|---------------|
-| `gravitino.stats.partition.storageOption.location`        | The location of 
Lance files          | `${GRAVITINO_HOME}/data/lance` | No                      
                        | 1.0.0         |
-| `gravitino.stats.partition.storageOption.maxRowsPerFile`  | The maximum rows 
per file            | `1000000`                      | No                       
                       | 1.0.0         |
-| `gravitino.stats.partition.storageOption.maxBytesPerFile` | The maximum 
bytes per file           | `104857600`                    | No                  
                            | 1.0.0         |
-| `gravitino.stats.partition.storageOption.maxRowsPerGroup` | The maximum rows 
per group           | `1000000`                      | No                       
                       | 1.0.0         |
-| `gravitino.stats.partition.storageOption.readBatchSize`   | The batch record 
number when reading | `10000`                        | No                       
                       | 1.0.0         |
+| Configuration item                                                   | 
Description                          | Default value                        | 
Required | Since version |
+|----------------------------------------------------------------------|--------------------------------------|--------------------------------------|----------|---------------|
+| `gravitino.stats.partition.storageOption.location`                   | The 
location of Lance files          | `${GRAVITINO_HOME}/data/lance`       | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.maxRowsPerFile`             | The 
maximum rows per file            | `1000000`                            | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.maxBytesPerFile`            | The 
maximum bytes per file           | `104857600`                          | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.maxRowsPerGroup`            | The 
maximum rows per group           | `1000000`                            | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.readBatchSize`              | The 
batch record number when reading | `10000`                              | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.datasetCacheSize`           | size 
of dataset cache for Lance      | `0`, It means we don't use the cache | No     
  | 1.0.0         |
+| `gravitino.stats.partition.storageOption.metadataFileCacheSizeBytes` | The 
Lance's metadata file cache size | `102400`                             | No    
   | 1.0.0         |
+| `gravitino.stats.partition.storageOption.indexCacheSizeBytes`        | The 
Lance's index cache size         | `102400`                             | No    
   | 1.0.0         |
+
+If you have many tables with a small number of partitions, you should set a 
smaller metadataFileCacheSizeBytes and indexCacheSizeBytes.
 
 ### Implementation a custom partition storage
 
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d09d2cb845..0de35e4118 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -28,7 +28,7 @@ guava = "32.1.3-jre"
 lombok = "1.18.20"
 slf4j = "2.0.16"
 log4j = "2.24.3"
-lance = "0.31.0"
+lance = "0.34.0"
 jetty = "9.4.51.v20230217"
 jersey = "2.41"
 mockito = "4.11.0"


Reply via email to