This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 474de4a098 [#7987] feat(core): Add Lance Storage for partition 
statistics (#8228)
474de4a098 is described below

commit 474de4a098184f6105cd4cad13624287310585b3
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 27 16:15:19 2025 +0800

    [#7987] feat(core): Add Lance Storage for partition statistics (#8228)
    
    ### What changes were proposed in this pull request?
    
    Add Lance Storage for partition statistics
    
    ### Why are the changes needed?
    
    Fix: #7987
    
    ### Does this PR introduce _any_ user-facing change?
    
    I will add the document later.
    
    ### How was this patch tested?
    
    Added UT.
---
 .../workflows/backend-integration-test-action.yml  |   5 +
 .github/workflows/build.yml                        |   5 +
 core/build.gradle.kts                              |   1 +
 .../main/java/org/apache/gravitino/Configs.java    |   5 +-
 .../apache/gravitino/stats/StatisticManager.java   |   2 +-
 .../storage/LancePartitionStatisticStorage.java    | 429 +++++++++++++++++++++
 .../LancePartitionStatisticStorageFactory.java     |  28 ++
 .../TestLancePartitionStatisticStorage.java        | 243 ++++++++++++
 gradle/libs.versions.toml                          |   2 +
 9 files changed, 716 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/backend-integration-test-action.yml 
b/.github/workflows/backend-integration-test-action.yml
index 9d69a2914e..413369eb43 100644
--- a/.github/workflows/backend-integration-test-action.yml
+++ b/.github/workflows/backend-integration-test-action.yml
@@ -53,6 +53,11 @@ jobs:
         run: |
           dev/ci/util_free_space.sh
 
+      - name: Install dependencies
+        run: |
+          wget 
https://nz2.archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+          sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb  
+
       - name: Backend Integration Test (JDK${{ inputs.java-version }}-${{ 
inputs.test-mode }}-${{ inputs.backend }})
         id: integrationTest
         run: >
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index c04e90f5c3..787acdfb90 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -127,6 +127,11 @@ jobs:
         run: |
           dev/ci/util_free_space.sh
 
+      - name: Install dependencies
+        run: |
+          wget 
https://nz2.archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
+          sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb  
+
       - name: Build with Gradle
         run: ./gradlew build -PskipITs -PjdkVersion=${{ matrix.java-version }} 
-PskipDockerTests=false -x :clients:client-python:build
 
diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index 44b3319964..00663ea6ab 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -41,6 +41,7 @@ dependencies {
   implementation(libs.concurrent.trees)
   implementation(libs.guava)
   implementation(libs.h2db)
+  implementation(libs.lance)
   implementation(libs.mybatis)
 
   annotationProcessor(libs.lombok)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 63681fec1d..479aa0c701 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,7 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
-import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
+import 
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory;
 
 public class Configs {
 
@@ -447,11 +447,10 @@ public class Configs {
           .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
           .createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
 
-  // TODO: Change default value to a Lance partition statistics storage 
factory class
   public static final ConfigEntry<String> 
PARTITION_STATS_STORAGE_FACTORY_CLASS =
       new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
           .doc("The partition stats storage factory class.")
           .version(ConfigConstants.VERSION_1_0_0)
           .stringConf()
-          
.createWithDefault(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
+          
.createWithDefault(LancePartitionStatisticStorageFactory.class.getCanonicalName());
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java 
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index 44d1a80b76..1741bc20dd 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
 
 public class StatisticManager implements Closeable {
 
-  private static final String OPTIONS_PREFIX = 
"gravitino.stats.partition.option.";
+  private static final String OPTIONS_PREFIX = 
"gravitino.stats.partition.storageOption.";
   private static final Logger LOG = 
LoggerFactory.getLogger(StatisticManager.class);
 
   private final EntityStore store;
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
new file mode 100644
index 0000000000..416c183f45
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.FragmentOperation;
+import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.ipc.LanceScanner;
+import com.lancedb.lance.ipc.ScanOptions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.UInt8Vector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/** LancePartitionStatisticStorage is based on Lance format files. */
+public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage {
+
+  private static final String LOCATION = "location";
+  private static final String DEFAULT_LOCATION =
+      String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", 
"lance");
+  private static final String MAX_ROWS_PER_FILE = "maxRowsPerFile";
+  private static final int DEFAULT_MAX_ROWS_PER_FILE = 1000000; // 10M
+  private static final String MAX_BYTES_PER_FILE = "maxBytesPerFile";
+  private static final int DEFAULT_MAX_BYTES_PER_FILE = 100 * 1024 * 1024; // 
100 MB
+  private static final String MAX_ROWS_PER_GROUP = "maxRowsPerGroup";
+  private static final int DEFAULT_MAX_ROWS_PER_GROUP = 1000000; // 1M
+  private static final String READ_BATCH_SIZE = "readBatchSize";
+  private static final int DEFAULT_READ_BATCH_SIZE = 10000; // 10K
+  // The schema is `table_id`, `partition_name`,  `statistic_name`, 
`statistic_value`, `audit_info`
+  private static final String TABLE_ID_COLUMN = "table_id";
+  private static final String PARTITION_NAME_COLUMN = "partition_name";
+  private static final String STATISTIC_NAME_COLUMN = "statistic_name";
+  private static final String STATISTIC_VALUE_COLUMN = "statistic_value";
+  private static final String AUDIT_INFO_COLUMN = "audit_info";
+
+  private static final Schema SCHEMA =
+      new Schema(
+          Arrays.asList(
+              Field.notNullable(TABLE_ID_COLUMN, new ArrowType.Int(64, false)),
+              Field.notNullable(PARTITION_NAME_COLUMN, new ArrowType.Utf8()),
+              Field.notNullable(STATISTIC_NAME_COLUMN, new ArrowType.Utf8()),
+              Field.notNullable(STATISTIC_VALUE_COLUMN, new 
ArrowType.LargeUtf8()),
+              Field.notNullable(AUDIT_INFO_COLUMN, new ArrowType.Utf8())));
+
+  private final Map<String, String> properties;
+  private final String location;
+  private final BufferAllocator allocator;
+  private final int maxRowsPerFile;
+  private final int maxBytesPerFile;
+  private final int maxRowsPerGroup;
+  private final int readBatchSize;
+
+  private final EntityStore entityStore = 
GravitinoEnv.getInstance().entityStore();
+
+  public LancePartitionStatisticStorage(Map<String, String> properties) {
+    this.allocator = new RootAllocator();
+    this.location = properties.getOrDefault(LOCATION, DEFAULT_LOCATION);
+    this.maxRowsPerFile =
+        Integer.parseInt(
+            properties.getOrDefault(MAX_ROWS_PER_FILE, 
String.valueOf(DEFAULT_MAX_ROWS_PER_FILE)));
+    Preconditions.checkArgument(
+        maxRowsPerFile > 0, "Lance partition statistics storage maxRowsPerFile 
must be positive");
+
+    this.maxBytesPerFile =
+        Integer.parseInt(
+            properties.getOrDefault(
+                MAX_BYTES_PER_FILE, 
String.valueOf(DEFAULT_MAX_BYTES_PER_FILE)));
+    Preconditions.checkArgument(
+        maxBytesPerFile > 0, "Lance partition statistics storage 
maxBytesPerFile must be positive");
+
+    this.maxRowsPerGroup =
+        Integer.parseInt(
+            properties.getOrDefault(
+                MAX_ROWS_PER_GROUP, 
String.valueOf(DEFAULT_MAX_ROWS_PER_GROUP)));
+    Preconditions.checkArgument(
+        maxRowsPerGroup > 0, "Lance partition statistics storage 
maxRowsPerGroup must be positive");
+
+    this.readBatchSize =
+        Integer.parseInt(
+            properties.getOrDefault(READ_BATCH_SIZE, 
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
+    Preconditions.checkArgument(
+        readBatchSize > 0, "Lance partition statistics storage readBatchSize 
must be positive");
+    this.properties = properties;
+  }
+
+  @Override
+  public List<PersistedPartitionStatistics> listStatistics(
+      String metalake, MetadataObject metadataObject, PartitionRange 
partitionRange)
+      throws IOException {
+    NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType type = MetadataObjectUtil.toEntityType(metadataObject);
+
+    Long tableId = entityStore.get(identifier, type, TableEntity.class).id();
+
+    return listStatisticsImpl(tableId, getPartitionFilter(partitionRange));
+  }
+
+  @Override
+  public int dropStatistics(
+      String metalake, List<MetadataObjectStatisticsDrop> 
partitionStatisticsToDrop)
+      throws IOException {
+    for (MetadataObjectStatisticsDrop objectDrop : partitionStatisticsToDrop) {
+      NameIdentifier identifier =
+          MetadataObjectUtil.toEntityIdent(metalake, 
objectDrop.metadataObject());
+      Entity.EntityType type = 
MetadataObjectUtil.toEntityType(objectDrop.metadataObject());
+
+      Long tableId = entityStore.get(identifier, type, TableEntity.class).id();
+      dropStatisticsImpl(tableId, objectDrop.drops());
+    }
+
+    // Lance storage can't get the number of dropped statistics, so we return 
1 as a placeholder.
+    return 1;
+  }
+
+  @Override
+  public void updateStatistics(
+      String metalake, List<MetadataObjectStatisticsUpdate> 
statisticsToUpdate) throws IOException {
+    try {
+      //  TODO: The small updates and deletion may cause performance issues. 
The storage need to add
+      // compaction operations.
+      for (MetadataObjectStatisticsUpdate objectUpdate : statisticsToUpdate) {
+        NameIdentifier identifier =
+            MetadataObjectUtil.toEntityIdent(metalake, 
objectUpdate.metadataObject());
+        Entity.EntityType type = 
MetadataObjectUtil.toEntityType(objectUpdate.metadataObject());
+
+        Long tableId = entityStore.get(identifier, type, 
TableEntity.class).id();
+        List<PartitionStatisticsDrop> partitionDrops =
+            objectUpdate.partitionUpdates().stream()
+                .map(
+                    partitionStatisticsUpdate ->
+                        PartitionStatisticsModification.drop(
+                            partitionStatisticsUpdate.partitionName(),
+                            
Lists.newArrayList(partitionStatisticsUpdate.statistics().keySet())))
+                .collect(Collectors.toList());
+
+        // TODO: Lance Java API doesn't support the upsert operations although 
Python API has
+        // already supported it. We should push Lance community to support it, 
otherwise  we can't
+        // accomplish update operation in one transaction.
+        dropStatisticsImpl(tableId, partitionDrops);
+        appendStatisticsImpl(tableId, objectUpdate.partitionUpdates());
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private void appendStatisticsImpl(Long tableId, 
List<PartitionStatisticsUpdate> updates) {
+    String fileName = getFilePath(tableId);
+    try (Dataset datasetRead = open(fileName)) {
+      List<FragmentMetadata> fragmentMetas;
+      int count = 0;
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) 
{
+        for (PartitionStatisticsUpdate update : updates) {
+          count += update.statistics().size();
+        }
+
+        for (FieldVector vector : root.getFieldVectors()) {
+          vector.setInitialCapacity(count);
+        }
+        root.allocateNew();
+        int index = 0;
+
+        for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
+          String partitionName = updatePartitionStatistic.partitionName();
+          for (Map.Entry<String, StatisticValue<?>> statistic :
+              updatePartitionStatistic.statistics().entrySet()) {
+            String statisticName = statistic.getKey();
+            String statisticValue =
+                
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
+
+            UInt8Vector tableIdVector = (UInt8Vector) 
root.getVector(TABLE_ID_COLUMN);
+            VarCharVector partitionNameVector =
+                (VarCharVector) root.getVector(PARTITION_NAME_COLUMN);
+            VarCharVector statisticNameVector =
+                (VarCharVector) root.getVector(STATISTIC_NAME_COLUMN);
+            LargeVarCharVector statisticValueVector =
+                (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
+            VarCharVector auditInfoVector = (VarCharVector) 
root.getVector(AUDIT_INFO_COLUMN);
+
+            tableIdVector.set(index, tableId);
+            partitionNameVector.setSafe(index, 
partitionName.getBytes(StandardCharsets.UTF_8));
+            statisticNameVector.setSafe(index, 
statisticName.getBytes(StandardCharsets.UTF_8));
+            statisticValueVector.setSafe(index, 
statisticValue.getBytes(StandardCharsets.UTF_8));
+            AuditInfo auditInfo =
+                AuditInfo.builder()
+                    .withCreator(PrincipalUtils.getCurrentUserName())
+                    .withCreateTime(Instant.now())
+                    .withLastModifier(PrincipalUtils.getCurrentUserName())
+                    .withLastModifiedTime(Instant.now())
+                    .build();
+            auditInfoVector.setSafe(
+                index,
+                JsonUtils.anyFieldMapper()
+                    .writeValueAsString(auditInfo)
+                    .getBytes(StandardCharsets.UTF_8));
+
+            index++;
+          }
+        }
+
+        root.setRowCount(index);
+
+        fragmentMetas =
+            Fragment.create(
+                getFilePath(tableId),
+                allocator,
+                root,
+                new WriteParams.Builder()
+                    .withMaxRowsPerFile(maxRowsPerFile)
+                    .withMaxBytesPerFile(maxBytesPerFile)
+                    .withMaxRowsPerGroup(maxRowsPerGroup)
+                    .withStorageOptions(properties)
+                    .build());
+        FragmentOperation.Append appendOp = new 
FragmentOperation.Append(fragmentMetas);
+        Dataset.commit(
+                allocator,
+                getFilePath(tableId),
+                appendOp,
+                Optional.of(datasetRead.version()),
+                properties)
+            .close();
+      }
+
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize statistic value", e);
+    }
+  }
+
+  private String getFilePath(Long tableId) {
+    return location + "/" + tableId + ".lance";
+  }
+
+  private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop> 
drops) {
+    String fileName = getFilePath(tableId);
+    try (Dataset dataset = open(fileName)) {
+      List<String> partitionSQLs = Lists.newArrayList();
+      for (PartitionStatisticsDrop drop : drops) {
+        List<String> statistics = drop.statisticNames();
+        String partition = drop.partitionName();
+        partitionSQLs.add(
+            "table_id = "
+                + tableId
+                + " AND partition_name = '"
+                + partition
+                + "' AND statistic_name IN ("
+                + statistics.stream().map(str -> "'" + str + 
"'").collect(Collectors.joining(", "))
+                + ")");
+      }
+
+      if (partitionSQLs.size() == 1) {
+        dataset.delete(partitionSQLs.get(0));
+      } else if (partitionSQLs.size() > 1) {
+        String filterSQL =
+            partitionSQLs.stream().map(str -> "(" + str + 
")").collect(Collectors.joining(" OR "));
+        dataset.delete(filterSQL);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (allocator != null) {
+      allocator.close();
+    }
+  }
+
+  private static String getPartitionFilter(PartitionRange range) {
+    String fromPartitionNameFilter =
+        range
+            .lowerPartitionName()
+            .flatMap(
+                name ->
+                    range
+                        .lowerBoundType()
+                        .map(
+                            type ->
+                                "AND partition_name "
+                                    + (type == PartitionRange.BoundType.CLOSED 
? ">= " : "> ")
+                                    + "'"
+                                    + name
+                                    + "'"))
+            .orElse("");
+    String toPartitionNameFilter =
+        range
+            .upperPartitionName()
+            .flatMap(
+                name ->
+                    range
+                        .upperBoundType()
+                        .map(
+                            type ->
+                                "AND partition_name "
+                                    + (type == PartitionRange.BoundType.CLOSED 
? "<= " : "< ")
+                                    + "'"
+                                    + name
+                                    + "'"))
+            .orElse("");
+
+    return fromPartitionNameFilter + toPartitionNameFilter;
+  }
+
+  private List<PersistedPartitionStatistics> listStatisticsImpl(
+      Long tableId, String partitionFilter) {
+    String fileName = getFilePath(tableId);
+
+    try (Dataset dataset = open(fileName)) {
+
+      String filter = "table_id = " + tableId + partitionFilter;
+
+      try (LanceScanner scanner =
+          dataset.newScan(
+              new ScanOptions.Builder()
+                  .columns(
+                      Arrays.asList(
+                          TABLE_ID_COLUMN,
+                          PARTITION_NAME_COLUMN,
+                          STATISTIC_NAME_COLUMN,
+                          STATISTIC_VALUE_COLUMN,
+                          AUDIT_INFO_COLUMN))
+                  .withRowId(true)
+                  .batchSize(readBatchSize)
+                  .filter(filter)
+                  .build())) {
+        Map<String, List<PersistedStatistic>> partitionStatistics = 
Maps.newConcurrentMap();
+        try (ArrowReader reader = scanner.scanBatches()) {
+          while (reader.loadNextBatch()) {
+            VectorSchemaRoot root = reader.getVectorSchemaRoot();
+            List<FieldVector> fieldVectors = root.getFieldVectors();
+            VarCharVector partitionNameVector = (VarCharVector) 
fieldVectors.get(1);
+            VarCharVector statisticNameVector = (VarCharVector) 
fieldVectors.get(2);
+            LargeVarCharVector statisticValueVector = (LargeVarCharVector) 
fieldVectors.get(3);
+            VarCharVector auditInfoNameVector = (VarCharVector) 
fieldVectors.get(4);
+
+            for (int i = 0; i < root.getRowCount(); i++) {
+              String partitionName = new String(partitionNameVector.get(i), 
StandardCharsets.UTF_8);
+              String statisticName = new String(statisticNameVector.get(i), 
StandardCharsets.UTF_8);
+              String statisticValueStr =
+                  new String(statisticValueVector.get(i), 
StandardCharsets.UTF_8);
+              String auditInoStr = new String(auditInfoNameVector.get(i), 
StandardCharsets.UTF_8);
+
+              StatisticValue<?> statisticValue =
+                  JsonUtils.anyFieldMapper().readValue(statisticValueStr, 
StatisticValue.class);
+              AuditInfo auditInfo =
+                  JsonUtils.anyFieldMapper().readValue(auditInoStr, 
AuditInfo.class);
+
+              PersistedStatistic persistedStatistic =
+                  PersistedStatistic.of(statisticName, statisticValue, 
auditInfo);
+
+              partitionStatistics
+                  .computeIfAbsent(partitionName, k -> Lists.newArrayList())
+                  .add(persistedStatistic);
+            }
+          }
+
+          return partitionStatistics.entrySet().stream()
+              .map(entry -> PersistedPartitionStatistics.of(entry.getKey(), 
entry.getValue()))
+              .collect(Collectors.toList());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private Dataset open(String fileName) {
+    try {
+      return Dataset.open(fileName, allocator);
+    } catch (IllegalArgumentException illegalArgumentException) {
+      if (illegalArgumentException.getMessage().contains("was not found")) {
+        return Dataset.create(allocator, fileName, SCHEMA, new 
WriteParams.Builder().build());
+      } else {
+        throw illegalArgumentException;
+      }
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..1113772e6b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorageFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+
+public class LancePartitionStatisticStorageFactory implements 
PartitionStatisticStorageFactory {
+  @Override
+  public PartitionStatisticStorage create(Map<String, String> properties) {
+    return new LancePartitionStatisticStorage(properties);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
new file mode 100644
index 0000000000..3744b1dc61
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+public class TestLancePartitionStatisticStorage {
+
+  @Test
+  public void testLancePartitionStatisticStorage() throws Exception {
+    PartitionStatisticStorageFactory factory = new 
LancePartitionStatisticStorageFactory();
+
+    // Prepare table entity
+    String metalakeName = "metalake";
+    String catalogName = "catalog";
+    String schemaName = "schema";
+    String tableName = "table";
+
+    MetadataObject metadataObject =
+        MetadataObjects.of(
+            Lists.newArrayList(catalogName, schemaName, tableName), 
MetadataObject.Type.TABLE);
+
+    EntityStore entityStore = mock(EntityStore.class);
+    TableEntity tableEntity = mock(TableEntity.class);
+    when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+    when(tableEntity.id()).thenReturn(1L);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
+
+    String location = "/tmp/test";
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("lance.location", location);
+
+    PartitionStatisticStorage storage = factory.create(properties);
+
+    int count = 100;
+    int partitions = 10;
+    Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
originData =
+        generateData(metadataObject, count, partitions);
+    Map<MetadataObject, List<PartitionStatisticsUpdate>> statisticsToUpdate =
+        convertData(originData);
+
+    List<MetadataObjectStatisticsUpdate> objectUpdates = Lists.newArrayList();
+    for (Map.Entry<MetadataObject, List<PartitionStatisticsUpdate>> entry :
+        statisticsToUpdate.entrySet()) {
+      MetadataObject metadata = entry.getKey();
+      List<PartitionStatisticsUpdate> updates = entry.getValue();
+      objectUpdates.add(MetadataObjectStatisticsUpdate.of(metadata, updates));
+    }
+    storage.updateStatistics(metalakeName, objectUpdates);
+
+    String fromPartitionName =
+        "partition" + String.format("%0" + String.valueOf(partitions).length() 
+ "d", 0);
+    String toPartitionName =
+        "partition" + String.format("%0" + String.valueOf(partitions).length() 
+ "d", 1);
+
+    List<PersistedPartitionStatistics> listedStats =
+        storage.listStatistics(
+            metalakeName,
+            metadataObject,
+            PartitionRange.between(
+                fromPartitionName,
+                PartitionRange.BoundType.CLOSED,
+                toPartitionName,
+                PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, listedStats.size());
+
+    String targetPartitionName = "partition00";
+    for (PersistedPartitionStatistics persistStat : listedStats) {
+      String partitionName = persistStat.partitionName();
+      List<PersistedStatistic> stats = persistStat.statistics();
+      Assertions.assertEquals(targetPartitionName, partitionName);
+      Assertions.assertEquals(10, stats.size());
+
+      for (PersistedStatistic statistic : stats) {
+        String statisticName = statistic.name();
+        StatisticValue<?> statisticValue = statistic.value();
+
+        Assertions.assertTrue(
+            
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+        Assertions.assertEquals(
+            
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+            statisticValue.value());
+        Assertions.assertNotNull(statistic.auditInfo());
+      }
+    }
+
+    // Drop one statistic from partition00
+    List<MetadataObjectStatisticsDrop> tableStatisticsToDrop =
+        Lists.newArrayList(
+            MetadataObjectStatisticsDrop.of(
+                metadataObject,
+                Lists.newArrayList(
+                    PartitionStatisticsModification.drop(
+                        targetPartitionName, 
Lists.newArrayList("statistic0")))));
+
+    storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+
+    listedStats =
+        storage.listStatistics(
+            metalakeName,
+            metadataObject,
+            PartitionRange.between(
+                fromPartitionName,
+                PartitionRange.BoundType.CLOSED,
+                toPartitionName,
+                PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, listedStats.size());
+
+    for (PersistedPartitionStatistics partitionStat : listedStats) {
+      String partitionName = partitionStat.partitionName();
+      List<PersistedStatistic> stats = partitionStat.statistics();
+      Assertions.assertEquals(targetPartitionName, partitionName);
+      Assertions.assertEquals(9, stats.size());
+
+      for (PersistedStatistic statistic : stats) {
+        String statisticName = statistic.name();
+        StatisticValue<?> statisticValue = statistic.value();
+
+        Assertions.assertTrue(
+            
originData.get(metadataObject).get(targetPartitionName).containsKey(statisticName));
+        Assertions.assertEquals(
+            
originData.get(metadataObject).get(targetPartitionName).get(statisticName).value(),
+            statisticValue.value());
+        Assertions.assertNotNull(statistic.auditInfo());
+      }
+
+      // Drop one statistics from partition01 and partition02
+      tableStatisticsToDrop =
+          Lists.newArrayList(
+              MetadataObjectStatisticsDrop.of(
+                  metadataObject,
+                  Lists.newArrayList(
+                      PartitionStatisticsModification.drop(
+                          "partition01", Lists.newArrayList("statistic1")),
+                      PartitionStatisticsModification.drop(
+                          "partition02", Lists.newArrayList("statistic2")))));
+      storage.dropStatistics(metalakeName, tableStatisticsToDrop);
+
+      listedStats =
+          storage.listStatistics(
+              metalakeName,
+              metadataObject,
+              PartitionRange.between(
+                  fromPartitionName,
+                  PartitionRange.BoundType.CLOSED,
+                  "partition03",
+                  PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(3, listedStats.size());
+      for (PersistedPartitionStatistics persistPartStat : listedStats) {
+        stats = persistPartStat.statistics();
+        Assertions.assertEquals(9, stats.size());
+        for (PersistedStatistic statistic : stats) {
+          partitionName = persistPartStat.partitionName();
+          String statisticName = statistic.name();
+          StatisticValue<?> statisticValue = statistic.value();
+
+          Assertions.assertTrue(
+              
originData.get(metadataObject).get(partitionName).containsKey(statisticName));
+          Assertions.assertEquals(
+              
originData.get(metadataObject).get(partitionName).get(statisticName).value(),
+              statisticValue.value());
+          Assertions.assertNotNull(statistic.auditInfo());
+        }
+      }
+    }
+
+    FileUtils.deleteDirectory(new File(location + "/" + tableEntity.id() + 
".lance"));
+    storage.close();
+  }
+
+  private Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
generateData(
+      MetadataObject metadataObject, int count, int partitions) {
+    Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
statisticsToUpdate =
+        Maps.newHashMap();
+    for (int index = 0; index < count; index++) {
+      String partitionName =
+          "partition"
+              + String.format("%0" + String.valueOf(partitions).length() + 
"d", index % partitions);
+      statisticsToUpdate
+          .computeIfAbsent(metadataObject, k -> Maps.newHashMap())
+          .computeIfAbsent(partitionName, kp -> Maps.newHashMap())
+          .put("statistic" + index, StatisticValues.stringValue("value" + 
index));
+    }
+    return statisticsToUpdate;
+  }
+
+  private static Map<MetadataObject, List<PartitionStatisticsUpdate>> 
convertData(
+      Map<MetadataObject, Map<String, Map<String, StatisticValue<?>>>> 
statisticsToUpdate) {
+    Map<MetadataObject, List<PartitionStatisticsUpdate>> newData = 
Maps.newHashMap();
+    for (Map.Entry<MetadataObject, Map<String, Map<String, 
StatisticValue<?>>>> tableStatistic :
+        statisticsToUpdate.entrySet()) {
+      List<PartitionStatisticsUpdate> list = Lists.newArrayList();
+      newData.put(tableStatistic.getKey(), list);
+      for (Map.Entry<String, Map<String, StatisticValue<?>>> 
partitionStatistic :
+          tableStatistic.getValue().entrySet()) {
+        String partitionName = partitionStatistic.getKey();
+        Map<String, StatisticValue<?>> stats = partitionStatistic.getValue();
+        PartitionStatisticsUpdate update =
+            PartitionStatisticsModification.update(partitionName, stats);
+        list.add(update);
+      }
+    }
+    return newData;
+  }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 45f3bcac5a..04d25e82fc 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -29,6 +29,7 @@ guava = "32.1.3-jre"
 lombok = "1.18.20"
 slf4j = "2.0.9"
 log4j = "2.24.3"
+lance = "0.31.0"
 jetty = "9.4.51.v20230217"
 jersey = "2.41"
 mockito = "4.11.0"
@@ -159,6 +160,7 @@ log4j-api = { group = "org.apache.logging.log4j", name = 
"log4j-api", version.re
 log4j-core = { group = "org.apache.logging.log4j", name = "log4j-core", 
version.ref = "log4j" }
 log4j-12-api = { group = "org.apache.logging.log4j", name = "log4j-1.2-api", 
version.ref = "log4j" }
 log4j-layout-template-json = { group = "org.apache.logging.log4j", name = 
"log4j-layout-template-json", version.ref = "log4j" }
+lance = { group = "com.lancedb", name = "lance-core", version.ref = "lance" }
 jakarta-validation-api = { group = "jakarta.validation", name = 
"jakarta.validation-api", version.ref = "jakarta-validation" }
 jetty-server = { group = "org.eclipse.jetty", name = "jetty-server", 
version.ref = "jetty" }
 jetty-servlet = { group = "org.eclipse.jetty", name = "jetty-servlet", 
version.ref = "jetty" }


Reply via email to