This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 474de4a098 [#7987] feat(core): Add Lance Storage for partition
statistics (#8228)
474de4a098 is described below
commit 474de4a098184f6105cd4cad13624287310585b3
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 27 16:15:19 2025 +0800
[#7987] feat(core): Add Lance Storage for partition statistics (#8228)
### What changes were proposed in this pull request?
Add Lance Storage for partition statistics
### Why are the changes needed?
Fix: #7987
### Does this PR introduce _any_ user-facing change?
I will add the document later.
### How was this patch tested?
Added UT.
---
.../workflows/backend-integration-test-action.yml | 5 +
.github/workflows/build.yml | 5 +
core/build.gradle.kts | 1 +
.../main/java/org/apache/gravitino/Configs.java | 5 +-
.../apache/gravitino/stats/StatisticManager.java | 2 +-
.../storage/LancePartitionStatisticStorage.java | 429 +++++++++++++++++++++
.../LancePartitionStatisticStorageFactory.java | 28 ++
.../TestLancePartitionStatisticStorage.java | 243 ++++++++++++
gradle/libs.versions.toml | 2 +
9 files changed, 716 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/backend-integration-test-action.yml
b/.github/workflows/backend-integration-test-action.yml
index 9d69a2914e..413369eb43 100644
--- a/.github/workflows/backend-integration-test-action.yml
+++ b/.github/workflows/backend-integration-test-action.yml
@@ -53,6 +53,11 @@ jobs:
run: |
dev/ci/util_free_space.sh
+ - name: Install dependencies
+ run: |
+ wget
https://nz2.archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+ sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+
- name: Backend Integration Test (JDK${{ inputs.java-version }}-${{
inputs.test-mode }}-${{ inputs.backend }})
id: integrationTest
run: >
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index c04e90f5c3..787acdfb90 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -127,6 +127,11 @@ jobs:
run: |
dev/ci/util_free_space.sh
+ - name: Install dependencies
+ run: |
+ wget
https://nz2.archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+ sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+
- name: Build with Gradle
run: ./gradlew build -PskipITs -PjdkVersion=${{ matrix.java-version }}
-PskipDockerTests=false -x :clients:client-python:build
diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index 44b3319964..00663ea6ab 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -41,6 +41,7 @@ dependencies {
implementation(libs.concurrent.trees)
implementation(libs.guava)
implementation(libs.h2db)
+ implementation(libs.lance)
implementation(libs.mybatis)
annotationProcessor(libs.lombok)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 63681fec1d..479aa0c701 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,7 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
-import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
+import
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory;
public class Configs {
@@ -447,11 +447,10 @@ public class Configs {
.checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
.createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
- // TODO: Change default value to a Lance partition statistics storage
factory class
public static final ConfigEntry<String>
PARTITION_STATS_STORAGE_FACTORY_CLASS =
new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
.doc("The partition stats storage factory class.")
.version(ConfigConstants.VERSION_1_0_0)
.stringConf()
-
.createWithDefault(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
+
.createWithDefault(LancePartitionStatisticStorageFactory.class.getCanonicalName());
}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index 44d1a80b76..1741bc20dd 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
public class StatisticManager implements Closeable {
- private static final String OPTIONS_PREFIX =
"gravitino.stats.partition.option.";
+ private static final String OPTIONS_PREFIX =
"gravitino.stats.partition.storageOption.";
private static final Logger LOG =
LoggerFactory.getLogger(StatisticManager.class);
private final EntityStore store;
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
new file mode 100644
index 0000000000..416c183f45
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.FragmentOperation;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.ipc.LanceScanner;
+import com.lancedb.lance.ipc.ScanOptions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.UInt8Vector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/** LancePartitionStatisticStorage is based on Lance format files. */
+public class LancePartitionStatisticStorage implements
PartitionStatisticStorage {
+
+ private static final String LOCATION = "location";
+ private static final String DEFAULT_LOCATION =
+ String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data",
"lance");
+ private static final String MAX_ROWS_PER_FILE = "maxRowsPerFile";
+ private static final int DEFAULT_MAX_ROWS_PER_FILE = 1000000; // 10M
+ private static final String MAX_BYTES_PER_FILE = "maxBytesPerFile";
+ private static final int DEFAULT_MAX_BYTES_PER_FILE = 100 * 1024 * 1024; //
100 MB
+ private static final String MAX_ROWS_PER_GROUP = "maxRowsPerGroup";
+ private static final int DEFAULT_MAX_ROWS_PER_GROUP = 1000000; // 1M
+ private static final String READ_BATCH_SIZE = "readBatchSize";
+ private static final int DEFAULT_READ_BATCH_SIZE = 10000; // 10K
+ // The schema is `table_id`, `partition_name`, `statistic_name`,
`statistic_value`, `audit_info`
+ private static final String TABLE_ID_COLUMN = "table_id";
+ private static final String PARTITION_NAME_COLUMN = "partition_name";
+ private static final String STATISTIC_NAME_COLUMN = "statistic_name";
+ private static final String STATISTIC_VALUE_COLUMN = "statistic_value";
+ private static final String AUDIT_INFO_COLUMN = "audit_info";
+
+ private static final Schema SCHEMA =
+ new Schema(
+ Arrays.asList(
+ Field.notNullable(TABLE_ID_COLUMN, new ArrowType.Int(64, false)),
+ Field.notNullable(PARTITION_NAME_COLUMN, new ArrowType.Utf8()),
+ Field.notNullable(STATISTIC_NAME_COLUMN, new ArrowType.Utf8()),
+ Field.notNullable(STATISTIC_VALUE_COLUMN, new
ArrowType.LargeUtf8()),
+ Field.notNullable(AUDIT_INFO_COLUMN, new ArrowType.Utf8())));
+
+ private final Map<String, String> properties;
+ private final String location;
+ private final BufferAllocator allocator;
+ private final int maxRowsPerFile;
+ private final int maxBytesPerFile;
+ private final int maxRowsPerGroup;
+ private final int readBatchSize;
+
+ private final EntityStore entityStore =
GravitinoEnv.getInstance().entityStore();
+
+ public LancePartitionStatisticStorage(Map<String, String> properties) {
+ this.allocator = new RootAllocator();
+ this.location = properties.getOrDefault(LOCATION, DEFAULT_LOCATION);
+ this.maxRowsPerFile =
+ Integer.parseInt(
+ properties.getOrDefault(MAX_ROWS_PER_FILE,
String.valueOf(DEFAULT_MAX_ROWS_PER_FILE)));
+ Preconditions.checkArgument(
+ maxRowsPerFile > 0, "Lance partition statistics storage maxRowsPerFile
must be positive");
+
+ this.maxBytesPerFile =
+ Integer.parseInt(
+ properties.getOrDefault(
+ MAX_BYTES_PER_FILE,
String.valueOf(DEFAULT_MAX_BYTES_PER_FILE)));
+ Preconditions.checkArgument(
+ maxBytesPerFile > 0, "Lance partition statistics storage
maxBytesPerFile must be positive");
+
+ this.maxRowsPerGroup =
+ Integer.parseInt(
+ properties.getOrDefault(
+ MAX_ROWS_PER_GROUP,
String.valueOf(DEFAULT_MAX_ROWS_PER_GROUP)));
+ Preconditions.checkArgument(
+ maxRowsPerGroup > 0, "Lance partition statistics storage
maxRowsPerGroup must be positive");
+
+ this.readBatchSize =
+ Integer.parseInt(
+ properties.getOrDefault(READ_BATCH_SIZE,
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
+ Preconditions.checkArgument(
+ readBatchSize > 0, "Lance partition statistics storage readBatchSize
must be positive");
+ this.properties = properties;
+ }
+
+ @Override
+ public List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, PartitionRange
partitionRange)
+ throws IOException {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+ Entity.EntityType type = MetadataObjectUtil.toEntityType(metadataObject);
+
+ Long tableId = entityStore.get(identifier, type, TableEntity.class).id();
+
+ return listStatisticsImpl(tableId, getPartitionFilter(partitionRange));
+ }
+
+ @Override
+ public int dropStatistics(
+ String metalake, List<MetadataObjectStatisticsDrop>
partitionStatisticsToDrop)
+ throws IOException {
+ for (MetadataObjectStatisticsDrop objectDrop : partitionStatisticsToDrop) {
+ NameIdentifier identifier =
+ MetadataObjectUtil.toEntityIdent(metalake,
objectDrop.metadataObject());
+ Entity.EntityType type =
MetadataObjectUtil.toEntityType(objectDrop.metadataObject());
+
+ Long tableId = entityStore.get(identifier, type, TableEntity.class).id();
+ dropStatisticsImpl(tableId, objectDrop.drops());
+ }
+
+ // Lance storage can't get the number of dropped statistics, so we return
1 as a placeholder.
+ return 1;
+ }
+
+ @Override
+ public void updateStatistics(
+ String metalake, List<MetadataObjectStatisticsUpdate>
statisticsToUpdate) throws IOException {
+ try {
+ // TODO: The small updates and deletion may cause performance issues.
The storage need to add
+ // compaction operations.
+ for (MetadataObjectStatisticsUpdate objectUpdate : statisticsToUpdate) {
+ NameIdentifier identifier =
+ MetadataObjectUtil.toEntityIdent(metalake,
objectUpdate.metadataObject());
+ Entity.EntityType type =
MetadataObjectUtil.toEntityType(objectUpdate.metadataObject());
+
+ Long tableId = entityStore.get(identifier, type,
TableEntity.class).id();
+ List<PartitionStatisticsDrop> partitionDrops =
+ objectUpdate.partitionUpdates().stream()
+ .map(
+ partitionStatisticsUpdate ->
+ PartitionStatisticsModification.drop(
+ partitionStatisticsUpdate.partitionName(),
+
Lists.newArrayList(partitionStatisticsUpdate.statistics().keySet())))
+ .collect(Collectors.toList());
+
+ // TODO: Lance Java API doesn't support the upsert operations although
Python API has
+ // already supported it. We should push Lance community to support it,
otherwise we can't
+ // accomplish update operation in one transaction.
+ dropStatisticsImpl(tableId, partitionDrops);
+ appendStatisticsImpl(tableId, objectUpdate.partitionUpdates());
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private void appendStatisticsImpl(Long tableId,
List<PartitionStatisticsUpdate> updates) {
+ String fileName = getFilePath(tableId);
+ try (Dataset datasetRead = open(fileName)) {
+ List<FragmentMetadata> fragmentMetas;
+ int count = 0;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator))
{
+ for (PartitionStatisticsUpdate update : updates) {
+ count += update.statistics().size();
+ }
+
+ for (FieldVector vector : root.getFieldVectors()) {
+ vector.setInitialCapacity(count);
+ }
+ root.allocateNew();
+ int index = 0;
+
+ for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
+ String partitionName = updatePartitionStatistic.partitionName();
+ for (Map.Entry<String, StatisticValue<?>> statistic :
+ updatePartitionStatistic.statistics().entrySet()) {
+ String statisticName = statistic.getKey();
+ String statisticValue =
+
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
+
+ UInt8Vector tableIdVector = (UInt8Vector)
root.getVector(TABLE_ID_COLUMN);
+ VarCharVector partitionNameVector =
+ (VarCharVector) root.getVector(PARTITION_NAME_COLUMN);
+ VarCharVector statisticNameVector =
+ (VarCharVector) root.getVector(STATISTIC_NAME_COLUMN);
+ LargeVarCharVector statisticValueVector =
+ (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
+ VarCharVector auditInfoVector = (VarCharVector)
root.getVector(AUDIT_INFO_COLUMN);
+
+ tableIdVector.set(index, tableId);
+ partitionNameVector.setSafe(index,
partitionName.getBytes(StandardCharsets.UTF_8));
+ statisticNameVector.setSafe(index,
statisticName.getBytes(StandardCharsets.UTF_8));
+ statisticValueVector.setSafe(index,
statisticValue.getBytes(StandardCharsets.UTF_8));
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .withLastModifier(PrincipalUtils.getCurrentUserName())
+ .withLastModifiedTime(Instant.now())
+ .build();
+ auditInfoVector.setSafe(
+ index,
+ JsonUtils.anyFieldMapper()
+ .writeValueAsString(auditInfo)
+ .getBytes(StandardCharsets.UTF_8));
+
+ index++;
+ }
+ }
+
+ root.setRowCount(index);
+
+ fragmentMetas =
+ Fragment.create(
+ getFilePath(tableId),
+ allocator,
+ root,
+ new WriteParams.Builder()
+ .withMaxRowsPerFile(maxRowsPerFile)
+ .withMaxBytesPerFile(maxBytesPerFile)
+ .withMaxRowsPerGroup(maxRowsPerGroup)
+ .withStorageOptions(properties)
+ .build());
+ FragmentOperation.Append appendOp = new
FragmentOperation.Append(fragmentMetas);
+ Dataset.commit(
+ allocator,
+ getFilePath(tableId),
+ appendOp,
+ Optional.of(datasetRead.version()),
+ properties)
+ .close();
+ }
+
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize statistic value", e);
+ }
+ }
+
+ private String getFilePath(Long tableId) {
+ return location + "/" + tableId + ".lance";
+ }
+
+ private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop>
drops) {
+ String fileName = getFilePath(tableId);
+ try (Dataset dataset = open(fileName)) {
+ List<String> partitionSQLs = Lists.newArrayList();
+ for (PartitionStatisticsDrop drop : drops) {
+ List<String> statistics = drop.statisticNames();
+ String partition = drop.partitionName();
+ partitionSQLs.add(
+ "table_id = "
+ + tableId
+ + " AND partition_name = '"
+ + partition
+ + "' AND statistic_name IN ("
+ + statistics.stream().map(str -> "'" + str +
"'").collect(Collectors.joining(", "))
+ + ")");
+ }
+
+ if (partitionSQLs.size() == 1) {
+ dataset.delete(partitionSQLs.get(0));
+ } else if (partitionSQLs.size() > 1) {
+ String filterSQL =
+ partitionSQLs.stream().map(str -> "(" + str +
")").collect(Collectors.joining(" OR "));
+ dataset.delete(filterSQL);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (allocator != null) {
+ allocator.close();
+ }
+ }
+
+ private static String getPartitionFilter(PartitionRange range) {
+ String fromPartitionNameFilter =
+ range
+ .lowerPartitionName()
+ .flatMap(
+ name ->
+ range
+ .lowerBoundType()
+ .map(
+ type ->
+ "AND partition_name "
+ + (type == PartitionRange.BoundType.CLOSED
? ">= " : "> ")
+ + "'"
+ + name
+ + "'"))
+ .orElse("");
+ String toPartitionNameFilter =
+ range
+ .upperPartitionName()
+ .flatMap(
+ name ->
+ range
+ .upperBoundType()
+ .map(
+ type ->
+ "AND partition_name "
+ + (type == PartitionRange.BoundType.CLOSED
? "<= " : "< ")
+ + "'"
+ + name
+ + "'"))
+ .orElse("");
+
+ return fromPartitionNameFilter + toPartitionNameFilter;
+ }
+
+ private List<PersistedPartitionStatistics> listStatisticsImpl(
+ Long tableId, String partitionFilter) {
+ String fileName = getFilePath(tableId);
+
+ try (Dataset dataset = open(fileName)) {
+
+ String filter = "table_id = " + tableId + partitionFilter;
+
+ try (LanceScanner scanner =
+ dataset.newScan(
+ new ScanOptions.Builder()
+ .columns(
+ Arrays.asList(
+ TABLE_ID_COLUMN,
+ PARTITION_NAME_COLUMN,
+ STATISTIC_NAME_COLUMN,
+ STATISTIC_VALUE_COLUMN,
+ AUDIT_INFO_COLUMN))
+ .withRowId(true)
+ .batchSize(readBatchSize)
+ .filter(filter)
+ .build())) {
+ Map<String, List<PersistedStatistic>> partitionStatistics =
Maps.newConcurrentMap();
+ try (ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ VarCharVector partitionNameVector = (VarCharVector)
fieldVectors.get(1);
+ VarCharVector statisticNameVector = (VarCharVector)
fieldVectors.get(2);
+ LargeVarCharVector statisticValueVector = (LargeVarCharVector)
fieldVectors.get(3);
+ VarCharVector auditInfoNameVector = (VarCharVector)
fieldVectors.get(4);
+
+ for (int i = 0; i < root.getRowCount(); i++) {
+ String partitionName = new String(partitionNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticName = new String(statisticNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticValueStr =
+ new String(statisticValueVector.get(i),
StandardCharsets.UTF_8);
+ String auditInoStr = new String(auditInfoNameVector.get(i),
StandardCharsets.UTF_8);
+
+ StatisticValue<?> statisticValue =
+ JsonUtils.anyFieldMapper().readValue(statisticValueStr,
StatisticValue.class);
+ AuditInfo auditInfo =
+ JsonUtils.anyFieldMapper().readValue(auditInoStr,
AuditInfo.class);
+
+ PersistedStatistic persistedStatistic =
+ PersistedStatistic.of(statisticName, statisticValue,
auditInfo);
+
+ partitionStatistics
+ .computeIfAbsent(partitionName, k -> Lists.newArrayList())
+ .add(persistedStatistic);
+ }
+ }
+
+ return partitionStatistics.entrySet().stream()
+ .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Dataset open(String fileName) {
+ try {
+ return Dataset.open(fileName, allocator);
+ } catch (IllegalArgumentException illegalArgumentException) {
+ if (illegalArgumentException.getMessage().contains("was not found")) {
+ return Dataset.create(allocator, fileName, SCHEMA, new
WriteParams.Builder().build());
+ } else {
+ throw illegalArgumentException;
+ }
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..1113772e6b
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+
+public class LancePartitionStatisticStorageFactory implements
PartitionStatisticStorageFactory {
+ @Override
+ public PartitionStatisticStorage create(Map<String, String> properties) {
+ return new LancePartitionStatisticStorage(properties);
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
new file mode 100644
index 0000000000..3744b1dc61
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+public class TestLancePartitionStatisticStorage {
+
+ @Test
+ public void testLancePartitionStatisticStorage() throws Exception {
+ PartitionStatisticStorageFactory factory = new
LancePartitionStatisticStorageFactory();
+
+ // Prepare table entity
+ String metalakeName = "metalake";
+ String catalogName = "catalog";
+ String schemaName = "schema";
+ String tableName = "table";
+
+ MetadataObject metadataObject =
+ MetadataObjects.of(
+ Lists.newArrayList(catalogName, schemaName, tableName),
MetadataObject.Type.TABLE);
+
+ EntityStore entityStore = mock(EntityStore.class);
+ TableEntity tableEntity = mock(TableEntity.class);
+ when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+ when(tableEntity.id()).thenReturn(1L);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore",
entityStore, true);
+
+ String location = "/tmp/test";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("lance.location", location);
+
+ PartitionStatisticStorage storage = factory.create(properties);
+
+ int count = 100;
+ int partitions = 10;
+ Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
originData =
+ generateData(metadataObject, count, partitions);
+ Map<MetadataObject, List<PartitionStatisticsUpdate>> statisticsToUpdate =
+ convertData(originData);
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates = Lists.newArrayList();
+ for (Map.Entry<MetadataObject, List<PartitionStatisticsUpdate>> entry :
+ statisticsToUpdate.entrySet()) {
+ MetadataObject metadata = entry.getKey();
+ List<PartitionStatisticsUpdate> updates = entry.getValue();
+ objectUpdates.add(MetadataObjectStatisticsUpdate.of(metadata, updates));
+ }
+ storage.updateStatistics(metalakeName, objectUpdates);
+
+ String fromPartitionName =
+ "partition" + String.format("%0" + String.valueOf(partitions).length()
+ "d", 0);
+ String toPartitionName =
+ "partition" + String.format("%0" + String.valueOf(partitions).length()
+ "d", 1);
+
+ List<PersistedPartitionStatistics> listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ toPartitionName,
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, listedStats.size());
+
+ String targetPartitionName = "partition00";
+ for (PersistedPartitionStatistics persistStat : listedStats) {
+ String partitionName = persistStat.partitionName();
+ List<PersistedStatistic> stats = persistStat.statistics();
+ Assertions.assertEquals(targetPartitionName, partitionName);
+ Assertions.assertEquals(10, stats.size());
+
+ for (PersistedStatistic statistic : stats) {
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+ }
+
+ // Drop one statistic from partition00
+ List<MetadataObjectStatisticsDrop> tableStatisticsToDrop =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ metadataObject,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ targetPartitionName,
Lists.newArrayList("statistic0")))));
+
+ storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+
+ listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ toPartitionName,
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(1, listedStats.size());
+
+ for (PersistedPartitionStatistics partitionStat : listedStats) {
+ String partitionName = partitionStat.partitionName();
+ List<PersistedStatistic> stats = partitionStat.statistics();
+ Assertions.assertEquals(targetPartitionName, partitionName);
+ Assertions.assertEquals(9, stats.size());
+
+ for (PersistedStatistic statistic : stats) {
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+
+ // Drop one statistics from partition01 and partition02
+ tableStatisticsToDrop =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ metadataObject,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ "partition01", Lists.newArrayList("statistic1")),
+ PartitionStatisticsModification.drop(
+ "partition02", Lists.newArrayList("statistic2")))));
+ storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+
+ listedStats =
+ storage.listStatistics(
+ metalakeName,
+ metadataObject,
+ PartitionRange.between(
+ fromPartitionName,
+ PartitionRange.BoundType.CLOSED,
+ "partition03",
+ PartitionRange.BoundType.OPEN));
+ Assertions.assertEquals(3, listedStats.size());
+ for (PersistedPartitionStatistics persistPartStat : listedStats) {
+ stats = persistPartStat.statistics();
+ Assertions.assertEquals(9, stats.size());
+ for (PersistedStatistic statistic : stats) {
+ partitionName = persistPartStat.partitionName();
+ String statisticName = statistic.name();
+ StatisticValue<?> statisticValue = statistic.value();
+
+ Assertions.assertTrue(
+
originData.get(metadataObject).get(partitionName).containsKey(statisticName));
+ Assertions.assertEquals(
+
originData.get(metadataObject).get(partitionName).get(statisticName).value(),
+ statisticValue.value());
+ Assertions.assertNotNull(statistic.auditInfo());
+ }
+ }
+ }
+
+ FileUtils.deleteDirectory(new File(location + "/" + tableEntity.id() +
".lance"));
+ storage.close();
+ }
+
+ private Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
generateData(
+ MetadataObject metadataObject, int count, int partitions) {
+ Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
statisticsToUpdate =
+ Maps.newHashMap();
+ for (int index = 0; index < count; index++) {
+ String partitionName =
+ "partition"
+ + String.format("%0" + String.valueOf(partitions).length() +
"d", index % partitions);
+ statisticsToUpdate
+ .computeIfAbsent(metadataObject, k -> Maps.newHashMap())
+ .computeIfAbsent(partitionName, kp -> Maps.newHashMap())
+ .put("statistic" + index, StatisticValues.stringValue("value" +
index));
+ }
+ return statisticsToUpdate;
+ }
+
+ private static Map<MetadataObject, List<PartitionStatisticsUpdate>>
convertData(
+ Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>>
statisticsToUpdate) {
+ Map<MetadataObject, List<PartitionStatisticsUpdate>> newData =
Maps.newHashMap();
+ for (Map.Entry<MetadataObject, Map<String, Map<String,
StatisticValue<?>>>> tableStatistic :
+ statisticsToUpdate.entrySet()) {
+ List<PartitionStatisticsUpdate> list = Lists.newArrayList();
+ newData.put(tableStatistic.getKey(), list);
+ for (Map.Entry<String, Map<String, StatisticValue<?>>>
partitionStatistic :
+ tableStatistic.getValue().entrySet()) {
+ String partitionName = partitionStatistic.getKey();
+ Map<String, StatisticValue<?>> stats = partitionStatistic.getValue();
+ PartitionStatisticsUpdate update =
+ PartitionStatisticsModification.update(partitionName, stats);
+ list.add(update);
+ }
+ }
+ return newData;
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 45f3bcac5a..04d25e82fc 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -29,6 +29,7 @@ guava = "32.1.3-jre"
lombok = "1.18.20"
slf4j = "2.0.9"
log4j = "2.24.3"
+lance = "0.31.0"
jetty = "9.4.51.v20230217"
jersey = "2.41"
mockito = "4.11.0"
@@ -159,6 +160,7 @@ log4j-api = { group = "org.apache.logging.log4j", name =
"log4j-api", version.re
log4j-core = { group = "org.apache.logging.log4j", name = "log4j-core",
version.ref = "log4j" }
log4j-12-api = { group = "org.apache.logging.log4j", name = "log4j-1.2-api",
version.ref = "log4j" }
log4j-layout-template-json = { group = "org.apache.logging.log4j", name =
"log4j-layout-template-json", version.ref = "log4j" }
+lance = { group = "com.lancedb", name = "lance-core", version.ref = "lance" }
jakarta-validation-api = { group = "jakarta.validation", name =
"jakarta.validation-api", version.ref = "jakarta-validation" }
jetty-server = { group = "org.eclipse.jetty", name = "jetty-server",
version.ref = "jetty" }
jetty-servlet = { group = "org.eclipse.jetty", name = "jetty-servlet",
version.ref = "jetty" }