This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 03a152a7e6 [#8371] improvement(core): Add cache for Lance (#8450)
03a152a7e6 is described below
commit 03a152a7e63a7c62c9f8f4efab8faf23326a5c37
Author: roryqi <[email protected]>
AuthorDate: Fri Sep 12 15:28:39 2025 +0800
[#8371] improvement(core): Add cache for Lance (#8450)
### What changes were proposed in this pull request?
Add cache for Lance
### Why are the changes needed?
Fix: #8371
### Does this PR introduce _any_ user-facing change?
Yes, I added the documents.
### How was this patch tested?
Add UT.
---
.../storage/LancePartitionStatisticStorage.java | 405 ++++++++++++++-------
.../TestLancePartitionStatisticStorage.java | 175 ++++++++-
docs/manage-statistics-in-gravitino.md | 19 +-
gradle/libs.versions.toml | 2 +-
4 files changed, 452 insertions(+), 149 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
index 416c183f45..9160021d0b 100644
---
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -19,24 +19,36 @@
package org.apache.gravitino.stats.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.Fragment;
import com.lancedb.lance.FragmentMetadata;
-import com.lancedb.lance.FragmentOperation;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.ipc.LanceScanner;
import com.lancedb.lance.ipc.ScanOptions;
+import com.lancedb.lance.operation.Append;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -79,6 +91,12 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private static final int DEFAULT_MAX_ROWS_PER_GROUP = 1000000; // 1M
private static final String READ_BATCH_SIZE = "readBatchSize";
private static final int DEFAULT_READ_BATCH_SIZE = 10000; // 10K
+ private static final String DATASET_CACHE_SIZE = "datasetCacheSize";
+ private static final int DEFAULT_DATASET_CACHE_SIZE = 0;
+ private static final String METADATA_FILE_CACHE_SIZE =
"metadataFileCacheSizeBytes";
+ private static final long DEFAULT_METADATA_FILE_CACHE_SIZE = 100L * 1024; //
100KB
+ private static final String INDEX_CACHE_SIZE = "indexCacheSizeBytes";
+ private static final long DEFAULT_INDEX_CACHE_SIZE = 100L * 1024; // 100KB
// The schema is `table_id`, `partition_name`, `statistic_name`,
`statistic_value`, `audit_info`
private static final String TABLE_ID_COLUMN = "table_id";
private static final String PARTITION_NAME_COLUMN = "partition_name";
@@ -86,6 +104,8 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private static final String STATISTIC_VALUE_COLUMN = "statistic_value";
private static final String AUDIT_INFO_COLUMN = "audit_info";
+ private final Optional<Cache<Long, Dataset>> datasetCache;
+
private static final Schema SCHEMA =
new Schema(
Arrays.asList(
@@ -102,6 +122,8 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private final int maxBytesPerFile;
private final int maxRowsPerGroup;
private final int readBatchSize;
+ private final long metadataFileCacheSize;
+ private final long indexCacheSize;
private final EntityStore entityStore =
GravitinoEnv.getInstance().entityStore();
@@ -133,7 +155,50 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
properties.getOrDefault(READ_BATCH_SIZE,
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
Preconditions.checkArgument(
readBatchSize > 0, "Lance partition statistics storage readBatchSize
must be positive");
+ int datasetCacheSize =
+ Integer.parseInt(
+ properties.getOrDefault(
+ DATASET_CACHE_SIZE,
String.valueOf(DEFAULT_DATASET_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ datasetCacheSize >= 0,
+ "Lance partition statistics storage datasetCacheSize must be greater
than or equal to 0");
+ this.metadataFileCacheSize =
+ Long.parseLong(
+ properties.getOrDefault(
+ METADATA_FILE_CACHE_SIZE,
String.valueOf(DEFAULT_METADATA_FILE_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ metadataFileCacheSize > 0,
+ "Lance partition statistics storage metadataFileCacheSizeBytes must be
positive");
+ this.indexCacheSize =
+ Long.parseLong(
+ properties.getOrDefault(INDEX_CACHE_SIZE,
String.valueOf(DEFAULT_INDEX_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ indexCacheSize > 0,
+ "Lance partition statistics storage indexCacheSizeBytes must be
positive");
+
this.properties = properties;
+ if (datasetCacheSize != 0) {
+ this.datasetCache =
+ Optional.of(
+ Caffeine.newBuilder()
+ .maximumSize(datasetCacheSize)
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(
+ 1,
+ newDaemonThreadFactory(
+
"lance-partition-statistic-storage-cache-cleaner"))))
+ .evictionListener(
+ (RemovalListener<Long, Dataset>)
+ (key, value, cause) -> {
+ if (value != null) {
+ value.close();
+ }
+ })
+ .build());
+ } else {
+ datasetCache = Optional.empty();
+ }
}
@Override
@@ -197,95 +262,39 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
}
}
- private void appendStatisticsImpl(Long tableId,
List<PartitionStatisticsUpdate> updates) {
- String fileName = getFilePath(tableId);
- try (Dataset datasetRead = open(fileName)) {
- List<FragmentMetadata> fragmentMetas;
- int count = 0;
- try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator))
{
- for (PartitionStatisticsUpdate update : updates) {
- count += update.statistics().size();
+ private void appendStatisticsImpl(Long tableId,
List<PartitionStatisticsUpdate> updates)
+ throws JsonProcessingException {
+ Dataset datasetRead = null;
+ Dataset newDataset = null;
+ try {
+ datasetRead = getDataset(tableId);
+ List<FragmentMetadata> fragmentMetas = createFragmentMetadata(tableId,
updates);
+
+ Transaction appendTxn =
+ datasetRead
+ .newTransactionBuilder()
+ .operation(Append.builder().fragments(fragmentMetas).build())
+ .transactionProperties(Collections.emptyMap())
+ .build();
+ newDataset = appendTxn.commit();
+
+ Dataset finalNewDataset = newDataset;
+ datasetCache.ifPresent(cache -> cache.put(tableId, finalNewDataset));
+ } finally {
+ if (!datasetCache.isPresent()) {
+ if (datasetRead != null) {
+ datasetRead.close();
}
-
- for (FieldVector vector : root.getFieldVectors()) {
- vector.setInitialCapacity(count);
+ if (newDataset != null) {
+ newDataset.close();
}
- root.allocateNew();
- int index = 0;
-
- for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
- String partitionName = updatePartitionStatistic.partitionName();
- for (Map.Entry<String, StatisticValue<?>> statistic :
- updatePartitionStatistic.statistics().entrySet()) {
- String statisticName = statistic.getKey();
- String statisticValue =
-
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
-
- UInt8Vector tableIdVector = (UInt8Vector)
root.getVector(TABLE_ID_COLUMN);
- VarCharVector partitionNameVector =
- (VarCharVector) root.getVector(PARTITION_NAME_COLUMN);
- VarCharVector statisticNameVector =
- (VarCharVector) root.getVector(STATISTIC_NAME_COLUMN);
- LargeVarCharVector statisticValueVector =
- (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
- VarCharVector auditInfoVector = (VarCharVector)
root.getVector(AUDIT_INFO_COLUMN);
-
- tableIdVector.set(index, tableId);
- partitionNameVector.setSafe(index,
partitionName.getBytes(StandardCharsets.UTF_8));
- statisticNameVector.setSafe(index,
statisticName.getBytes(StandardCharsets.UTF_8));
- statisticValueVector.setSafe(index,
statisticValue.getBytes(StandardCharsets.UTF_8));
- AuditInfo auditInfo =
- AuditInfo.builder()
- .withCreator(PrincipalUtils.getCurrentUserName())
- .withCreateTime(Instant.now())
- .withLastModifier(PrincipalUtils.getCurrentUserName())
- .withLastModifiedTime(Instant.now())
- .build();
- auditInfoVector.setSafe(
- index,
- JsonUtils.anyFieldMapper()
- .writeValueAsString(auditInfo)
- .getBytes(StandardCharsets.UTF_8));
-
- index++;
- }
- }
-
- root.setRowCount(index);
-
- fragmentMetas =
- Fragment.create(
- getFilePath(tableId),
- allocator,
- root,
- new WriteParams.Builder()
- .withMaxRowsPerFile(maxRowsPerFile)
- .withMaxBytesPerFile(maxBytesPerFile)
- .withMaxRowsPerGroup(maxRowsPerGroup)
- .withStorageOptions(properties)
- .build());
- FragmentOperation.Append appendOp = new
FragmentOperation.Append(fragmentMetas);
- Dataset.commit(
- allocator,
- getFilePath(tableId),
- appendOp,
- Optional.of(datasetRead.version()),
- properties)
- .close();
}
-
- } catch (JsonProcessingException e) {
- throw new RuntimeException("Failed to serialize statistic value", e);
}
}
- private String getFilePath(Long tableId) {
- return location + "/" + tableId + ".lance";
- }
-
private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop>
drops) {
- String fileName = getFilePath(tableId);
- try (Dataset dataset = open(fileName)) {
+ Dataset dataset = getDataset(tableId);
+ try {
List<String> partitionSQLs = Lists.newArrayList();
for (PartitionStatisticsDrop drop : drops) {
List<String> statistics = drop.statisticNames();
@@ -307,6 +316,10 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
partitionSQLs.stream().map(str -> "(" + str +
")").collect(Collectors.joining(" OR "));
dataset.delete(filterSQL);
}
+ } finally {
+ if (!datasetCache.isPresent() && dataset != null) {
+ dataset.close();
+ }
}
}
@@ -315,6 +328,85 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
if (allocator != null) {
allocator.close();
}
+
+ datasetCache.ifPresent(Cache::invalidateAll);
+ }
+
+ @VisibleForTesting
+ Cache<Long, Dataset> getDatasetCache() {
+ return datasetCache.orElse(null);
+ }
+
+ private String getFilePath(Long tableId) {
+ return location + "/" + tableId + ".lance";
+ }
+
+ private List<FragmentMetadata> createFragmentMetadata(
+ Long tableId, List<PartitionStatisticsUpdate> updates) throws
JsonProcessingException {
+ List<FragmentMetadata> fragmentMetas;
+ int count = 0;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) {
+ for (PartitionStatisticsUpdate update : updates) {
+ count += update.statistics().size();
+ }
+
+ for (FieldVector vector : root.getFieldVectors()) {
+ vector.setInitialCapacity(count);
+ }
+ root.allocateNew();
+ int index = 0;
+
+ UInt8Vector tableIdVector = (UInt8Vector)
root.getVector(TABLE_ID_COLUMN);
+ VarCharVector partitionNameVector = (VarCharVector)
root.getVector(PARTITION_NAME_COLUMN);
+ VarCharVector statisticNameVector = (VarCharVector)
root.getVector(STATISTIC_NAME_COLUMN);
+ LargeVarCharVector statisticValueVector =
+ (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
+ VarCharVector auditInfoVector = (VarCharVector)
root.getVector(AUDIT_INFO_COLUMN);
+
+ for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
+ String partitionName = updatePartitionStatistic.partitionName();
+ for (Map.Entry<String, StatisticValue<?>> statistic :
+ updatePartitionStatistic.statistics().entrySet()) {
+ String statisticName = statistic.getKey();
+ String statisticValue =
+
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
+
+ tableIdVector.set(index, tableId);
+ partitionNameVector.setSafe(index,
partitionName.getBytes(StandardCharsets.UTF_8));
+ statisticNameVector.setSafe(index,
statisticName.getBytes(StandardCharsets.UTF_8));
+ statisticValueVector.setSafe(index,
statisticValue.getBytes(StandardCharsets.UTF_8));
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .withLastModifier(PrincipalUtils.getCurrentUserName())
+ .withLastModifiedTime(Instant.now())
+ .build();
+ auditInfoVector.setSafe(
+ index,
+ JsonUtils.anyFieldMapper()
+ .writeValueAsString(auditInfo)
+ .getBytes(StandardCharsets.UTF_8));
+
+ index++;
+ }
+ }
+
+ root.setRowCount(index);
+
+ fragmentMetas =
+ Fragment.create(
+ getFilePath(tableId),
+ allocator,
+ root,
+ new WriteParams.Builder()
+ .withMaxRowsPerFile(maxRowsPerFile)
+ .withMaxBytesPerFile(maxBytesPerFile)
+ .withMaxRowsPerGroup(maxRowsPerGroup)
+ .withStorageOptions(properties)
+ .build());
+ return fragmentMetas;
+ }
}
private static String getPartitionFilter(PartitionRange range) {
@@ -354,70 +446,101 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private List<PersistedPartitionStatistics> listStatisticsImpl(
Long tableId, String partitionFilter) {
- String fileName = getFilePath(tableId);
-
- try (Dataset dataset = open(fileName)) {
-
- String filter = "table_id = " + tableId + partitionFilter;
-
- try (LanceScanner scanner =
- dataset.newScan(
- new ScanOptions.Builder()
- .columns(
- Arrays.asList(
- TABLE_ID_COLUMN,
- PARTITION_NAME_COLUMN,
- STATISTIC_NAME_COLUMN,
- STATISTIC_VALUE_COLUMN,
- AUDIT_INFO_COLUMN))
- .withRowId(true)
- .batchSize(readBatchSize)
- .filter(filter)
- .build())) {
- Map<String, List<PersistedStatistic>> partitionStatistics =
Maps.newConcurrentMap();
- try (ArrowReader reader = scanner.scanBatches()) {
- while (reader.loadNextBatch()) {
- VectorSchemaRoot root = reader.getVectorSchemaRoot();
- List<FieldVector> fieldVectors = root.getFieldVectors();
- VarCharVector partitionNameVector = (VarCharVector)
fieldVectors.get(1);
- VarCharVector statisticNameVector = (VarCharVector)
fieldVectors.get(2);
- LargeVarCharVector statisticValueVector = (LargeVarCharVector)
fieldVectors.get(3);
- VarCharVector auditInfoNameVector = (VarCharVector)
fieldVectors.get(4);
-
- for (int i = 0; i < root.getRowCount(); i++) {
- String partitionName = new String(partitionNameVector.get(i),
StandardCharsets.UTF_8);
- String statisticName = new String(statisticNameVector.get(i),
StandardCharsets.UTF_8);
- String statisticValueStr =
- new String(statisticValueVector.get(i),
StandardCharsets.UTF_8);
- String auditInoStr = new String(auditInfoNameVector.get(i),
StandardCharsets.UTF_8);
-
- StatisticValue<?> statisticValue =
- JsonUtils.anyFieldMapper().readValue(statisticValueStr,
StatisticValue.class);
- AuditInfo auditInfo =
- JsonUtils.anyFieldMapper().readValue(auditInoStr,
AuditInfo.class);
-
- PersistedStatistic persistedStatistic =
- PersistedStatistic.of(statisticName, statisticValue,
auditInfo);
-
- partitionStatistics
- .computeIfAbsent(partitionName, k -> Lists.newArrayList())
- .add(persistedStatistic);
- }
- }
- return partitionStatistics.entrySet().stream()
- .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
- .collect(Collectors.toList());
+ Dataset dataset = getDataset(tableId);
+
+ String filter = "table_id = " + tableId + partitionFilter;
+
+ try (LanceScanner scanner =
+ dataset.newScan(
+ new ScanOptions.Builder()
+ .columns(
+ Arrays.asList(
+ TABLE_ID_COLUMN,
+ PARTITION_NAME_COLUMN,
+ STATISTIC_NAME_COLUMN,
+ STATISTIC_VALUE_COLUMN,
+ AUDIT_INFO_COLUMN))
+ .withRowId(true)
+ .batchSize(readBatchSize)
+ .filter(filter)
+ .build())) {
+ Map<String, List<PersistedStatistic>> partitionStatistics =
Maps.newConcurrentMap();
+ try (ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ VarCharVector partitionNameVector = (VarCharVector)
fieldVectors.get(1);
+ VarCharVector statisticNameVector = (VarCharVector)
fieldVectors.get(2);
+ LargeVarCharVector statisticValueVector = (LargeVarCharVector)
fieldVectors.get(3);
+ VarCharVector auditInfoNameVector = (VarCharVector)
fieldVectors.get(4);
+
+ for (int i = 0; i < root.getRowCount(); i++) {
+ String partitionName = new String(partitionNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticName = new String(statisticNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticValueStr =
+ new String(statisticValueVector.get(i),
StandardCharsets.UTF_8);
+ String auditInoStr = new String(auditInfoNameVector.get(i),
StandardCharsets.UTF_8);
+
+ StatisticValue<?> statisticValue =
+ JsonUtils.anyFieldMapper().readValue(statisticValueStr,
StatisticValue.class);
+ AuditInfo auditInfo =
+ JsonUtils.anyFieldMapper().readValue(auditInoStr,
AuditInfo.class);
+
+ PersistedStatistic persistedStatistic =
+ PersistedStatistic.of(statisticName, statisticValue,
auditInfo);
+
+ partitionStatistics
+ .computeIfAbsent(partitionName, k -> Lists.newArrayList())
+ .add(persistedStatistic);
+ }
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ return partitionStatistics.entrySet().stream()
+ .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (!datasetCache.isPresent() && dataset != null) {
+ dataset.close();
}
}
}
+ private Dataset getDataset(Long tableId) {
+ AtomicBoolean newlyCreated = new AtomicBoolean(false);
+ return datasetCache
+ .map(
+ cache -> {
+ Dataset cachedDataset =
+ cache.get(
+ tableId,
+ id -> {
+ newlyCreated.set(true);
+ return open(getFilePath(id));
+ });
+
+ // Ensure dataset uses the latest version
+ if (!newlyCreated.get()) {
+ cachedDataset.checkoutLatest();
+ }
+
+ return cachedDataset;
+ })
+ .orElse(open(getFilePath(tableId)));
+ }
+
private Dataset open(String fileName) {
try {
- return Dataset.open(fileName, allocator);
+ return Dataset.open(
+ allocator,
+ fileName,
+ new ReadOptions.Builder()
+ .setMetadataCacheSizeBytes(metadataFileCacheSize)
+ .setIndexCacheSizeBytes(indexCacheSize)
+ .build());
} catch (IllegalArgumentException illegalArgumentException) {
if (illegalArgumentException.getMessage().contains("was not found")) {
return Dataset.create(allocator, fileName, SCHEMA, new
WriteParams.Builder().build());
@@ -426,4 +549,8 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
}
}
}
+
+ private ThreadFactory newDaemonThreadFactory(String name) {
+ return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name +
"-%d").build();
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
index 3744b1dc61..3ab553c32e 100644
---
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -66,9 +66,10 @@ public class TestLancePartitionStatisticStorage {
String location = "/tmp/test";
Map<String, String> properties = Maps.newHashMap();
- properties.put("lance.location", location);
+ properties.put("location", location);
- PartitionStatisticStorage storage = factory.create(properties);
+ LancePartitionStatisticStorage storage =
+ (LancePartitionStatisticStorage) factory.create(properties);
int count = 100;
int partitions = 10;
@@ -206,6 +207,176 @@ public class TestLancePartitionStatisticStorage {
storage.close();
}
+ @Test
+ public void testLancePartitionStatisticStorageWithCache() throws Exception {
+ PartitionStatisticStorageFactory factory = new
LancePartitionStatisticStorageFactory();
+
+ // Prepare table entity
+ String metalakeName = "metalake";
+ String catalogName = "catalog";
+ String schemaName = "schema";
+ String tableName = "table";
+
+ MetadataObject metadataObject =
+ MetadataObjects.of(
+ Lists.newArrayList(catalogName, schemaName, tableName),
MetadataObject.Type.TABLE);
+
+ EntityStore entityStore = mock(EntityStore.class);
+ TableEntity tableEntity = mock(TableEntity.class);
+ when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+ when(tableEntity.id()).thenReturn(1L);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore",
entityStore, true);
+
+ String location = "/tmp/test";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("location", location);
+ properties.put("datasetCacheSize", "1000");
+
+ LancePartitionStatisticStorage storage =
+ (LancePartitionStatisticStorage) factory.create(properties);
+
+ int count = 100;
+ int partitions = 10;
+ Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
originData =
+ generateData(metadataObject, count, partitions);
+ Map<MetadataObject, List<PartitionStatisticsUpdate>> statisticsToUpdate =
+ convertData(originData);
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates = Lists.newArrayList();
+ for (Map.Entry<MetadataObject, List<PartitionStatisticsUpdate>> entry :
+ statisticsToUpdate.entrySet()) {
+ MetadataObject metadata = entry.getKey();
+ List<PartitionStatisticsUpdate> updates = entry.getValue();
+ objectUpdates.add(MetadataObjectStatisticsUpdate.of(metadata, updates));
+ }
+ storage.updateStatistics(metalakeName, objectUpdates);
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+ String fromPartitionName =
+ "partition" + String.format("%0" + String.valueOf(partitions).length()
+ "d", 0);
+ String toPartitionName =
+ "partition" + String.format("%0" + String.valueOf(partitions).length()
+ "d", 1);
+
+ List<PersistedPartitionStatistics> listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ toPartitionName,
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, listedStats.size());
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+ String targetPartitionName = "partition00";
+ for (PersistedPartitionStatistics persistStat : listedStats) {
+ String partitionName = persistStat.partitionName();
+ List<PersistedStatistic> stats = persistStat.statistics();
+ Assertions.assertEquals(targetPartitionName, partitionName);
+ Assertions.assertEquals(10, stats.size());
+
+ for (PersistedStatistic statistic : stats) {
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+ }
+
+ // Drop one statistic from partition00
+ List<MetadataObjectStatisticsDrop> tableStatisticsToDrop =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ metadataObject,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ targetPartitionName,
Lists.newArrayList("statistic0")))));
+
+ storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+ listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ toPartitionName,
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, listedStats.size());
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+ for (PersistedPartitionStatistics partitionStat : listedStats) {
+ String partitionName = partitionStat.partitionName();
+ List<PersistedStatistic> stats = partitionStat.statistics();
+ Assertions.assertEquals(targetPartitionName, partitionName);
+ Assertions.assertEquals(9, stats.size());
+
+ for (PersistedStatistic statistic : stats) {
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+
+ // Drop one statistics from partition01 and partition02
+ tableStatisticsToDrop =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ metadataObject,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ "partition01", Lists.newArrayList("statistic1")),
+ PartitionStatisticsModification.drop(
+ "partition02", Lists.newArrayList("statistic2")))));
+ storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+
+ listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ "partition03",
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(3, listedStats.size());
+ Assertions.assertEquals(1, storage.getDatasetCache().estimatedSize());
+ for (PersistedPartitionStatistics persistPartStat : listedStats) {
+ stats = persistPartStat.statistics();
+ Assertions.assertEquals(9, stats.size());
+ for (PersistedStatistic statistic : stats) {
+ partitionName = persistPartStat.partitionName();
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(partitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(partitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+ }
+ }
+
+ FileUtils.deleteDirectory(new File(location + "/" + tableEntity.id() +
".lance"));
+ storage.close();
+ }
+
private Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
generateData(
MetadataObject metadataObject, int count, int partitions) {
Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
statisticsToUpdate =
diff --git a/docs/manage-statistics-in-gravitino.md
b/docs/manage-statistics-in-gravitino.md
index 4902726a06..45e80829a4 100644
--- a/docs/manage-statistics-in-gravitino.md
+++ b/docs/manage-statistics-in-gravitino.md
@@ -245,13 +245,18 @@ For example, if you set an extra property `foo` to `bar`
for Lance storage optio
For Lance remote storage, you can refer to the document
[here](https://lancedb.github.io/lance/usage/storage/).
-| Configuration item | Description
| Default value | Required
| Since version |
-|-----------------------------------------------------------|--------------------------------------|--------------------------------|-------------------------------------------------|---------------|
-| `gravitino.stats.partition.storageOption.location` | The location of
Lance files | `${GRAVITINO_HOME}/data/lance` | No
| 1.0.0 |
-| `gravitino.stats.partition.storageOption.maxRowsPerFile` | The maximum rows
per file | `1000000` | No
| 1.0.0 |
-| `gravitino.stats.partition.storageOption.maxBytesPerFile` | The maximum
bytes per file | `104857600` | No
| 1.0.0 |
-| `gravitino.stats.partition.storageOption.maxRowsPerGroup` | The maximum rows
per group | `1000000` | No
| 1.0.0 |
-| `gravitino.stats.partition.storageOption.readBatchSize` | The batch record
number when reading | `10000` | No
| 1.0.0 |
+| Configuration item |
Description | Default value |
Required | Since version |
+|----------------------------------------------------------------------|--------------------------------------|--------------------------------------|----------|---------------|
+| `gravitino.stats.partition.storageOption.location` | The
location of Lance files | `${GRAVITINO_HOME}/data/lance` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.maxRowsPerFile` | The
maximum rows per file | `1000000` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.maxBytesPerFile` | The
maximum bytes per file | `104857600` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.maxRowsPerGroup` | The
maximum rows per group | `1000000` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.readBatchSize` | The
batch record number when reading | `10000` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.datasetCacheSize` | size
of dataset cache for Lance | `0`, It means we don't use the cache | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.metadataFileCacheSizeBytes` | The
Lance's metadata file cache size | `102400` | No
| 1.0.0 |
+| `gravitino.stats.partition.storageOption.indexCacheSizeBytes` | The
Lance's index cache size | `102400` | No
| 1.0.0 |
+
+If you have many tables with a small number of partitions, you should set a
smaller metadataFileCacheSizeBytes and indexCacheSizeBytes.
### Implementation a custom partition storage
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d09d2cb845..0de35e4118 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -28,7 +28,7 @@ guava = "32.1.3-jre"
lombok = "1.18.20"
slf4j = "2.0.16"
log4j = "2.24.3"
-lance = "0.31.0"
+lance = "0.34.0"
jetty = "9.4.51.v20230217"
jersey = "2.41"
mockito = "4.11.0"