This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ac8ac2a81 [#7836] feat(storage): Add partition statistics storage 
interface (#7850)
0ac8ac2a81 is described below

commit 0ac8ac2a81c524c1b00e7a0b779ea73cecb9edac
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 13 14:32:29 2025 +0800

    [#7836] feat(storage): Add partition statistics storage interface (#7850)
    
    ### What changes were proposed in this pull request?
    
    Add partition statistics storage interface
    
    ### Why are the changes needed?
    
    Fix: #7836
    
    ### Does this PR introduce _any_ user-facing change?
    
    Developer API, not for users.
    
    ### How was this patch tested?
    
    Just an interface.
---
 .../stats/SupportsPartitionStatistics.java         |   2 +-
 .../MemoryPartitionStatsStorageFactory.java        | 242 +++++++++++++++++++++
 .../storage/MetadataObjectStatisticsDrop.java      |  72 ++++++
 .../storage/MetadataObjectStatisticsUpdate.java    |  71 ++++++
 .../stats/storage/PartitionStatisticStorage.java   |  91 ++++++++
 .../storage/PartitionStatisticStorageFactory.java  |  36 +++
 .../storage/PersistedPartitionStatistics.java      |  73 +++++++
 .../storage/TestMemoryPartitionStatsStorage.java   |  88 ++++++++
 8 files changed, 674 insertions(+), 1 deletion(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
index a63eb9c948..12eac9e4cc 100644
--- 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
+++ 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
@@ -24,7 +24,7 @@ import 
org.apache.gravitino.exceptions.UnmodifiableStatisticException;
 
 /** SupportsPartitionStatistics provides methods to list and update statistics 
for partitions. */
 @Unstable
-interface SupportsPartitionStatistics {
+public interface SupportsPartitionStatistics {
 
   /**
    * Lists statistics for partitions from one partition name to another 
partition name.
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
new file mode 100644
index 0000000000..97f1297385
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryPartitionStatsStorageFactory implements 
PartitionStatisticStorageFactory {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MemoryPartitionStatsStorageFactory.class);
+
+  @Override
+  public PartitionStatisticStorage create(Map<String, String> properties) {
+    LOG.warn(
+        "The memory partition stats storage is only used for the tests,"
+            + "you shouldn't use it in the production environment.");
+    return new MemoryPartitionStatsStorage();
+  }
+
+  public static class MemoryPartitionStatsStorage implements 
PartitionStatisticStorage {
+    private static final Map<MetadataContainerKey, 
MetadataObjectStatisticsContainer>
+        totalStatistics = Maps.newConcurrentMap();
+
+    private MemoryPartitionStatsStorage() {}
+
+    @Override
+    public List<PersistedPartitionStatistics> listStatistics(
+        String metalake, MetadataObject metadataObject, PartitionRange range) {
+      MetadataObjectStatisticsContainer tableStats =
+          totalStatistics.get(new MetadataContainerKey(metalake, 
metadataObject));
+
+      if (tableStats == null) {
+        return Lists.newArrayList();
+      }
+
+      Map<String, Map<String, StatisticValue<?>>> resultStats = 
Maps.newHashMap();
+      for (PersistedPartitionStatistics partitionStat : 
tableStats.partitionStatistics().values()) {
+        String partitionName = partitionStat.partitionName();
+        boolean lowerBoundSatisfied =
+            isBoundSatisfied(
+                range.lowerPartitionName(),
+                range.lowerBoundType(),
+                partitionName,
+                BoundDirection.LOWER);
+
+        boolean upperBoundSatisfied =
+            isBoundSatisfied(
+                range.upperPartitionName(),
+                range.upperBoundType(),
+                partitionName,
+                BoundDirection.UPPER);
+
+        if (lowerBoundSatisfied && upperBoundSatisfied) {
+          resultStats.put(partitionName, 
Maps.newHashMap(partitionStat.statistics()));
+        }
+      }
+      return resultStats.entrySet().stream()
+          .map(entry -> PersistedPartitionStatistics.of(entry.getKey(), 
entry.getValue()))
+          .collect(Collectors.toList());
+    }
+
+    private static boolean isBoundSatisfied(
+        Optional<String> boundPartitionName,
+        Optional<PartitionRange.BoundType> boundPartitionType,
+        String partitionName,
+        BoundDirection boundDirection) {
+      return boundPartitionName
+          .flatMap(
+              targetPartitionName ->
+                  boundPartitionType.map(
+                      type -> boundDirection.compare(targetPartitionName, 
partitionName, type)))
+          .orElse(true);
+    }
+
+    @Override
+    public void updateStatistics(String metalake, 
List<MetadataObjectStatisticsUpdate> updates) {
+      for (MetadataObjectStatisticsUpdate update : updates) {
+        MetadataObject metadataObject = update.metadataObject();
+        MetadataObjectStatisticsContainer tableStats =
+            totalStatistics.computeIfAbsent(
+                new MetadataContainerKey(metalake, metadataObject),
+                key -> new 
MetadataObjectStatisticsContainer(Maps.newHashMap()));
+
+        List<PartitionStatisticsUpdate> stats = update.partitionUpdates();
+
+        for (PartitionStatisticsUpdate updatePartStat : stats) {
+          String partitionName = updatePartStat.partitionName();
+          Map<String, StatisticValue<?>> partitionStats = 
updatePartStat.statistics();
+          PersistedPartitionStatistics existedPartitionStats =
+              tableStats
+                  .partitionStatistics()
+                  .computeIfAbsent(
+                      partitionName,
+                      k -> PersistedPartitionStatistics.of(partitionName, new 
HashMap<>()));
+          for (Map.Entry<String, StatisticValue<?>> statEntry : 
partitionStats.entrySet()) {
+            String statName = statEntry.getKey();
+            StatisticValue<?> statValue = statEntry.getValue();
+            existedPartitionStats.statistics().put(statName, statValue);
+          }
+        }
+      }
+    }
+
+    @Override
+    public List<PersistedPartitionStatistics> listStatistics(
+        String metalake, MetadataObject metadataObject, List<String> 
partitionNames) {
+      throw new UnsupportedOperationException(
+          "Don't support listing statistics by partition names");
+    }
+
+    @Override
+    public int dropStatistics(String metalake, 
List<MetadataObjectStatisticsDrop> drops) {
+      int deleteCount = 0;
+      for (MetadataObjectStatisticsDrop drop : drops) {
+        MetadataObject metadataObject = drop.metadataObject();
+        List<PartitionStatisticsDrop> partitionsToDrop = drop.drops();
+        MetadataObjectStatisticsContainer tableStats =
+            totalStatistics.computeIfAbsent(
+                new MetadataContainerKey(metalake, metadataObject),
+                key -> new 
MetadataObjectStatisticsContainer(Maps.newHashMap()));
+
+        for (PartitionStatisticsDrop partStats : partitionsToDrop) {
+          if 
(tableStats.partitionStatistics().containsKey(partStats.partitionName())) {
+            PersistedPartitionStatistics persistedPartitionStatistics =
+                
tableStats.partitionStatistics().get(partStats.partitionName());
+            for (String statName : partStats.statisticNames()) {
+              Map<String, StatisticValue<?>> statisticValueMap =
+                  persistedPartitionStatistics.statistics();
+              if (statisticValueMap.containsKey(statName)) {
+                statisticValueMap.remove(statName);
+                deleteCount++;
+              }
+            }
+            if (persistedPartitionStatistics.statistics().isEmpty()) {
+              
tableStats.partitionStatistics().remove(partStats.partitionName());
+            }
+          }
+        }
+
+        if (tableStats.partitionStatistics().isEmpty()) {
+          totalStatistics.remove(new MetadataContainerKey(metalake, 
metadataObject));
+        }
+      }
+      return deleteCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+      totalStatistics.clear();
+    }
+
+    private static class MetadataContainerKey {
+      private final String metalake;
+      private final MetadataObject metadataObject;
+
+      private MetadataContainerKey(String metalake, MetadataObject 
metadataObject) {
+        this.metalake = metalake;
+        this.metadataObject = metadataObject;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MetadataContainerKey)) return false;
+        MetadataContainerKey that = (MetadataContainerKey) o;
+        return Objects.equals(metalake, that.metalake)
+            && Objects.equals(metadataObject, that.metadataObject);
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hash(metalake, metadataObject);
+      }
+    }
+
+    private static class MetadataObjectStatisticsContainer {
+
+      private final Map<String, PersistedPartitionStatistics> 
partitionStatistics;
+
+      private MetadataObjectStatisticsContainer(
+          Map<String, PersistedPartitionStatistics> partitionStatistics) {
+        this.partitionStatistics = partitionStatistics;
+      }
+
+      public Map<String, PersistedPartitionStatistics> partitionStatistics() {
+        return partitionStatistics;
+      }
+    }
+
+    public enum BoundDirection {
+      LOWER {
+        @Override
+        boolean compare(
+            String targetPartitionName, String partitionName, 
PartitionRange.BoundType type) {
+          int result = targetPartitionName.compareTo(partitionName);
+          return type == PartitionRange.BoundType.OPEN ? result > 0 : result 
>= 0;
+        }
+      },
+      UPPER {
+        @Override
+        boolean compare(
+            String targetPartitionName, String partitionName, 
PartitionRange.BoundType type) {
+          int result = targetPartitionName.compareTo(partitionName);
+          return type == PartitionRange.BoundType.OPEN ? result < 0 : result 
<= 0;
+        }
+      };
+
+      abstract boolean compare(
+          String targetPartitionName, String partitionName, 
PartitionRange.BoundType boundaryType);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
new file mode 100644
index 0000000000..792025f9ea
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/**
+ * MetadataObjectStatisticsDrop represents a collection of statistics drops 
for a specific
+ * MetadataObject.
+ */
+public class MetadataObjectStatisticsDrop {
+
+  private final MetadataObject metadataObject;
+
+  private final List<PartitionStatisticsDrop> drops;
+
+  /**
+   * Creates a new instance of MetadataObjectStatisticsDrop.
+   *
+   * @param metadataObject the MetadataObject for which these statistics drops 
are applicable
+   * @param drops a list of PartitionStatisticsDrop objects representing the 
statistics drops for
+   *     the
+   * @return a new instance of MetadataObjectStatisticsDrop
+   */
+  public static MetadataObjectStatisticsDrop of(
+      MetadataObject metadataObject, List<PartitionStatisticsDrop> drops) {
+    return new MetadataObjectStatisticsDrop(metadataObject, drops);
+  }
+
+  private MetadataObjectStatisticsDrop(
+      MetadataObject metadataObject, List<PartitionStatisticsDrop> drops) {
+    this.metadataObject = metadataObject;
+    this.drops = drops;
+  }
+
+  /**
+   * Returns the MetadataObject for which these statistics drops are 
applicable.
+   *
+   * @return the MetadataObject
+   */
+  public MetadataObject metadataObject() {
+    return metadataObject;
+  }
+
+  /**
+   * Returns the list of PartitionStatisticsDrop objects representing the 
statistics drops for the
+   * MetadataObject.
+   *
+   * @return a list of PartitionStatisticsDrop objects
+   */
+  public List<PartitionStatisticsDrop> drops() {
+    return drops;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
new file mode 100644
index 0000000000..60872b3858
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+
+/**
+ * MetadataObjectStatisticsUpdate represents a collection of statistics 
updates for a specific
+ * MetadataObject.
+ */
+public class MetadataObjectStatisticsUpdate {
+
+  private final MetadataObject metadataObject;
+
+  private final List<PartitionStatisticsUpdate> partitionUpdates;
+
+  /**
+   * Creates a new instance of MetadataObjectStatisticsUpdate.
+   *
+   * @param metadataObject the MetadataObject for which these statistics 
updates are applicable
+   * @param partitionUpdates a list of PartitionStatisticsUpdate objects 
representing the statistics
+   *     updates
+   * @return a new instance of MetadataObjectStatisticsUpdate
+   */
+  public static MetadataObjectStatisticsUpdate of(
+      MetadataObject metadataObject, List<PartitionStatisticsUpdate> 
partitionUpdates) {
+    return new MetadataObjectStatisticsUpdate(metadataObject, 
partitionUpdates);
+  }
+
+  private MetadataObjectStatisticsUpdate(
+      MetadataObject metadataObject, List<PartitionStatisticsUpdate> 
partitionUpdates) {
+    this.metadataObject = metadataObject;
+    this.partitionUpdates = partitionUpdates;
+  }
+
+  /**
+   * Returns the MetadataObject for which these statistics updates are 
applicable.
+   *
+   * @return the MetadataObject
+   */
+  public MetadataObject metadataObject() {
+    return metadataObject;
+  }
+
+  /**
+   * Returns the list of PartitionStatisticsUpdate objects representing the 
statistics updates
+   *
+   * @return a list of PartitionStatisticsUpdate objects
+   */
+  public List<PartitionStatisticsUpdate> partitionUpdates() {
+    return partitionUpdates;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
new file mode 100644
index 0000000000..ca64fa48c7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.io.Closeable;
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/** Interface for managing partition statistics in a storage system. */
+public interface PartitionStatisticStorage extends Closeable {
+
+  /**
+   * Lists statistics for a given metadata object within a specified range of 
partition names.
+   * Locking guarantee: The upper layer will acquire a read lock at the 
metadata object level. For
+   * example, if the metadata object is a table, the read lock of the table 
level will be held.
+   *
+   * @param metalake the name of the metalake
+   * @param metadataObject the metadata object for which statistics are being 
listed
+   * @param partitionRange the range of partition names for which statistics 
are being listed
+   * @return a list of {@link PersistedPartitionStatistics} objects, each 
containing the partition
+   *     name
+   */
+  List<PersistedPartitionStatistics> listStatistics(
+      String metalake, MetadataObject metadataObject, PartitionRange 
partitionRange);
+
+  /**
+   * Lists statistics for a given metadata object and specific partition 
names. This interface may
+   * be reserved for the future use. The upper logic layer does not currently 
invoke this
+   * implementation. Locking Guarantee: The upper layer will acquire a read 
lock at the metadata
+   * object level. For example, if the metadata object is a table, the read 
lock of the table level
+   * will be held.
+   *
+   * @param metalake the name of the metalake
+   * @param metadataObject the metadata object for which statistics are being 
listed
+   * @param partitionNames a list of partition names for which statistics are 
being listed
+   * @return a list of {@link PersistedPartitionStatistics} objects, each 
containing the partition
+   *     name
+   */
+  default List<PersistedPartitionStatistics> listStatistics(
+      String metalake, MetadataObject metadataObject, List<String> 
partitionNames) {
+    throw new UnsupportedOperationException(
+        "Don't support listStatistics with partition names yet.");
+  }
+
+  /**
+   * Drops statistics for specified partitions of a metadata object. Locking 
guarantee: The upper
+   * layer will acquire a write lock at the metadata object level. For 
example, if the metadata
+   * object is a table, the write lock of the table level will be held. The 
concrete implementation
+   * may perform partial drops, meaning that the underlying storage system may 
not support
+   * transactional delete.
+   *
+   * @param metalake the name of the metalake
+   * @param partitionStatisticsToDrop a map where the key is a {@link 
MetadataObject} and the value
+   *     is a list of {@link PartitionStatisticsDrop}
+   * @return the number of statistics dropped, which may be less than the size 
of the input list if
+   *     some statistics do not exist or cannot be dropped.
+   */
+  int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop> 
partitionStatisticsToDrop);
+
+  /**
+   * Updates statistics for a given metadata object. If the statistic exists, 
it will be updated; If
+   * the statistic doesn't exist, it will be created. Locking guarantee: The 
upper layer will
+   * acquire a write lock at the metadata object level. For example, if the 
metadata object is a
+   * table, the write lock of the table level will be held. The concrete 
implementation * may
+   * perform partial drops, meaning that the underlying storage system may not 
support transactional
+   * update.
+   *
+   * @param metalake the name of the metalake
+   * @param statisticsToUpdate a list of {@link 
MetadataObjectStatisticsUpdate} objects, each
+   *     containing the metadata object and its associated statistics updates.
+   */
+  void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate> 
statisticsToUpdate);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..42047bf3ff
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+
+/**
+ * Factory interface for creating instances of {@link
+ * org.apache.gravitino.stats.storage.PartitionStatisticStorage}.
+ */
+public interface PartitionStatisticStorageFactory {
+
+  /**
+   * Creates an instance of {@link 
org.apache.gravitino.stats.storage.PartitionStatisticStorage}.
+   *
+   * @param properties additional properties for the storage configuration
+   * @return an instance of {@link 
org.apache.gravitino.stats.storage.PartitionStatisticStorage}
+   */
+  PartitionStatisticStorage create(Map<String, String> properties);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
new file mode 100644
index 0000000000..0e3b647ae6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+import org.apache.gravitino.stats.StatisticValue;
+
+/** Represents a collection of statistics for a specific partition in a 
metadata object. */
+public class PersistedPartitionStatistics {
+
+  private final String partitionName;
+  private final Map<String, StatisticValue<?>> statistics;
+
+  /**
+   * Creates an instance of {@link PersistedPartitionStatistics}.
+   *
+   * @param partitionName the name of the partition for which these statistics 
are applicable
+   * @param statistics a map of statistics applicable to the partition, where 
the key is the
+   *     statistic name
+   * @return a new instance of {@link PersistedPartitionStatistics}
+   */
+  public static PersistedPartitionStatistics of(
+      String partitionName, Map<String, StatisticValue<?>> statistics) {
+    return new PersistedPartitionStatistics(partitionName, statistics);
+  }
+
+  /**
+   * Private constructor for {@link PersistedPartitionStatistics}.
+   *
+   * @param partitionName the name of the partition for which these statistics 
are applicable
+   * @param statistics a map of statistics applicable to the partition, where 
the key is the
+   *     statistic name
+   */
+  private PersistedPartitionStatistics(
+      String partitionName, Map<String, StatisticValue<?>> statistics) {
+    this.partitionName = partitionName;
+    this.statistics = statistics;
+  }
+
+  /**
+   * Returns the name of the partition for which these statistics are 
applicable.
+   *
+   * @return the name of the partition
+   */
+  public String partitionName() {
+    return partitionName;
+  }
+
+  /**
+   * Returns the statistics for the partition.
+   *
+   * @return a map of statistics applicable to the partition, where the key is 
the statistic name
+   */
+  public Map<String, StatisticValue<?>> statistics() {
+    return statistics;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
new file mode 100644
index 0000000000..4e60ff4dec
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMemoryPartitionStatsStorage {
+
+  @Test
+  public void testMemoryPartitionStatsStorage() throws IOException {
+    MemoryPartitionStatsStorageFactory factory = new 
MemoryPartitionStatsStorageFactory();
+    try (PartitionStatisticStorage storage = 
factory.create(Maps.newHashMap())) {
+      MetadataObject metadataObject =
+          MetadataObjects.of(
+              Lists.newArrayList("catalog", "schema", "table"), 
MetadataObject.Type.TABLE);
+
+      List<PersistedPartitionStatistics> stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(0, stats.size());
+
+      Map<String, StatisticValue<?>> statistics = Maps.newHashMap();
+      statistics.put("k1", StatisticValues.stringValue("v1"));
+      PartitionStatisticsUpdate update = PartitionStatisticsUpdate.of("p0", 
statistics);
+      MetadataObjectStatisticsUpdate metadataObjectStatisticsUpdate =
+          MetadataObjectStatisticsUpdate.of(metadataObject, 
Lists.newArrayList(update));
+      storage.updateStatistics("metalake", 
Lists.newArrayList(metadataObjectStatisticsUpdate));
+
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(1, stats.size());
+      Assertions.assertEquals(stats.get(0).partitionName(), "p0");
+      Map<String, StatisticValue<?>> partitionStats = 
stats.get(0).statistics();
+      Assertions.assertEquals(1, partitionStats.size());
+      Assertions.assertTrue(partitionStats.containsKey("k1"));
+      StatisticValue<?> value = partitionStats.get("k1");
+      Assertions.assertEquals(StatisticValues.stringValue("v1"), value);
+
+      PartitionStatisticsDrop drop = PartitionStatisticsDrop.of("p0", 
Lists.newArrayList("k1"));
+      List<PartitionStatisticsDrop> drops = Lists.newArrayList(drop);
+
+      List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
+          Lists.newArrayList(MetadataObjectStatisticsDrop.of(metadataObject, 
drops));
+      storage.dropStatistics("metalake", partitionStatisticsToDrop);
+
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(0, stats.size());
+    }
+  }
+}

Reply via email to