This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 0ac8ac2a81 [#7836] feat(storage): Add partition statistics storage
interface (#7850)
0ac8ac2a81 is described below
commit 0ac8ac2a81c524c1b00e7a0b779ea73cecb9edac
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 13 14:32:29 2025 +0800
[#7836] feat(storage): Add partition statistics storage interface (#7850)
### What changes were proposed in this pull request?
Add partition statistics storage interface
### Why are the changes needed?
Fix: #7836
### Does this PR introduce _any_ user-facing change?
Developer API, not for users.
### How was this patch tested?
Just an interface.
---
.../stats/SupportsPartitionStatistics.java | 2 +-
.../MemoryPartitionStatsStorageFactory.java | 242 +++++++++++++++++++++
.../storage/MetadataObjectStatisticsDrop.java | 72 ++++++
.../storage/MetadataObjectStatisticsUpdate.java | 71 ++++++
.../stats/storage/PartitionStatisticStorage.java | 91 ++++++++
.../storage/PartitionStatisticStorageFactory.java | 36 +++
.../storage/PersistedPartitionStatistics.java | 73 +++++++
.../storage/TestMemoryPartitionStatsStorage.java | 88 ++++++++
8 files changed, 674 insertions(+), 1 deletion(-)
diff --git
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
index a63eb9c948..12eac9e4cc 100644
---
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
+++
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
@@ -24,7 +24,7 @@ import
org.apache.gravitino.exceptions.UnmodifiableStatisticException;
/** SupportsPartitionStatistics provides methods to list and update statistics
for partitions. */
@Unstable
-interface SupportsPartitionStatistics {
+public interface SupportsPartitionStatistics {
/**
* Lists statistics for partitions from one partition name to another
partition name.
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
new file mode 100644
index 0000000000..97f1297385
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryPartitionStatsStorageFactory implements
PartitionStatisticStorageFactory {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(MemoryPartitionStatsStorageFactory.class);
+
+ @Override
+ public PartitionStatisticStorage create(Map<String, String> properties) {
+ LOG.warn(
+ "The memory partition stats storage is only used for the tests,"
+ + "you shouldn't use it in the production environment.");
+ return new MemoryPartitionStatsStorage();
+ }
+
+ public static class MemoryPartitionStatsStorage implements
PartitionStatisticStorage {
+ private static final Map<MetadataContainerKey,
MetadataObjectStatisticsContainer>
+ totalStatistics = Maps.newConcurrentMap();
+
+ private MemoryPartitionStatsStorage() {}
+
+ @Override
+ public List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, PartitionRange range) {
+ MetadataObjectStatisticsContainer tableStats =
+ totalStatistics.get(new MetadataContainerKey(metalake,
metadataObject));
+
+ if (tableStats == null) {
+ return Lists.newArrayList();
+ }
+
+ Map<String, Map<String, StatisticValue<?>>> resultStats =
Maps.newHashMap();
+ for (PersistedPartitionStatistics partitionStat :
tableStats.partitionStatistics().values()) {
+ String partitionName = partitionStat.partitionName();
+ boolean lowerBoundSatisfied =
+ isBoundSatisfied(
+ range.lowerPartitionName(),
+ range.lowerBoundType(),
+ partitionName,
+ BoundDirection.LOWER);
+
+ boolean upperBoundSatisfied =
+ isBoundSatisfied(
+ range.upperPartitionName(),
+ range.upperBoundType(),
+ partitionName,
+ BoundDirection.UPPER);
+
+ if (lowerBoundSatisfied && upperBoundSatisfied) {
+ resultStats.put(partitionName,
Maps.newHashMap(partitionStat.statistics()));
+ }
+ }
+ return resultStats.entrySet().stream()
+ .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private static boolean isBoundSatisfied(
+ Optional<String> boundPartitionName,
+ Optional<PartitionRange.BoundType> boundPartitionType,
+ String partitionName,
+ BoundDirection boundDirection) {
+ return boundPartitionName
+ .flatMap(
+ targetPartitionName ->
+ boundPartitionType.map(
+ type -> boundDirection.compare(targetPartitionName,
partitionName, type)))
+ .orElse(true);
+ }
+
+ @Override
+ public void updateStatistics(String metalake,
List<MetadataObjectStatisticsUpdate> updates) {
+ for (MetadataObjectStatisticsUpdate update : updates) {
+ MetadataObject metadataObject = update.metadataObject();
+ MetadataObjectStatisticsContainer tableStats =
+ totalStatistics.computeIfAbsent(
+ new MetadataContainerKey(metalake, metadataObject),
+ key -> new
MetadataObjectStatisticsContainer(Maps.newHashMap()));
+
+ List<PartitionStatisticsUpdate> stats = update.partitionUpdates();
+
+ for (PartitionStatisticsUpdate updatePartStat : stats) {
+ String partitionName = updatePartStat.partitionName();
+ Map<String, StatisticValue<?>> partitionStats =
updatePartStat.statistics();
+ PersistedPartitionStatistics existedPartitionStats =
+ tableStats
+ .partitionStatistics()
+ .computeIfAbsent(
+ partitionName,
+ k -> PersistedPartitionStatistics.of(partitionName, new
HashMap<>()));
+ for (Map.Entry<String, StatisticValue<?>> statEntry :
partitionStats.entrySet()) {
+ String statName = statEntry.getKey();
+ StatisticValue<?> statValue = statEntry.getValue();
+ existedPartitionStats.statistics().put(statName, statValue);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, List<String>
partitionNames) {
+ throw new UnsupportedOperationException(
+ "Don't support listing statistics by partition names");
+ }
+
+ @Override
+ public int dropStatistics(String metalake,
List<MetadataObjectStatisticsDrop> drops) {
+ int deleteCount = 0;
+ for (MetadataObjectStatisticsDrop drop : drops) {
+ MetadataObject metadataObject = drop.metadataObject();
+ List<PartitionStatisticsDrop> partitionsToDrop = drop.drops();
+ MetadataObjectStatisticsContainer tableStats =
+ totalStatistics.computeIfAbsent(
+ new MetadataContainerKey(metalake, metadataObject),
+ key -> new
MetadataObjectStatisticsContainer(Maps.newHashMap()));
+
+ for (PartitionStatisticsDrop partStats : partitionsToDrop) {
+ if
(tableStats.partitionStatistics().containsKey(partStats.partitionName())) {
+ PersistedPartitionStatistics persistedPartitionStatistics =
+
tableStats.partitionStatistics().get(partStats.partitionName());
+ for (String statName : partStats.statisticNames()) {
+ Map<String, StatisticValue<?>> statisticValueMap =
+ persistedPartitionStatistics.statistics();
+ if (statisticValueMap.containsKey(statName)) {
+ statisticValueMap.remove(statName);
+ deleteCount++;
+ }
+ }
+ if (persistedPartitionStatistics.statistics().isEmpty()) {
+
tableStats.partitionStatistics().remove(partStats.partitionName());
+ }
+ }
+ }
+
+ if (tableStats.partitionStatistics().isEmpty()) {
+ totalStatistics.remove(new MetadataContainerKey(metalake,
metadataObject));
+ }
+ }
+ return deleteCount;
+ }
+
+ @Override
+ public void close() throws IOException {
+ totalStatistics.clear();
+ }
+
+ private static class MetadataContainerKey {
+ private final String metalake;
+ private final MetadataObject metadataObject;
+
+ private MetadataContainerKey(String metalake, MetadataObject
metadataObject) {
+ this.metalake = metalake;
+ this.metadataObject = metadataObject;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MetadataContainerKey)) return false;
+ MetadataContainerKey that = (MetadataContainerKey) o;
+ return Objects.equals(metalake, that.metalake)
+ && Objects.equals(metadataObject, that.metadataObject);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metalake, metadataObject);
+ }
+ }
+
+ private static class MetadataObjectStatisticsContainer {
+
+ private final Map<String, PersistedPartitionStatistics>
partitionStatistics;
+
+ private MetadataObjectStatisticsContainer(
+ Map<String, PersistedPartitionStatistics> partitionStatistics) {
+ this.partitionStatistics = partitionStatistics;
+ }
+
+ public Map<String, PersistedPartitionStatistics> partitionStatistics() {
+ return partitionStatistics;
+ }
+ }
+
+ public enum BoundDirection {
+ LOWER {
+ @Override
+ boolean compare(
+ String targetPartitionName, String partitionName,
PartitionRange.BoundType type) {
+ int result = targetPartitionName.compareTo(partitionName);
+ return type == PartitionRange.BoundType.OPEN ? result > 0 : result
>= 0;
+ }
+ },
+ UPPER {
+ @Override
+ boolean compare(
+ String targetPartitionName, String partitionName,
PartitionRange.BoundType type) {
+ int result = targetPartitionName.compareTo(partitionName);
+ return type == PartitionRange.BoundType.OPEN ? result < 0 : result
<= 0;
+ }
+ };
+
+ abstract boolean compare(
+ String targetPartitionName, String partitionName,
PartitionRange.BoundType boundaryType);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
new file mode 100644
index 0000000000..792025f9ea
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsDrop.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/**
+ * MetadataObjectStatisticsDrop represents a collection of statistics drops
for a specific
+ * MetadataObject.
+ */
+public class MetadataObjectStatisticsDrop {
+
+ private final MetadataObject metadataObject;
+
+ private final List<PartitionStatisticsDrop> drops;
+
+ /**
+ * Creates a new instance of MetadataObjectStatisticsDrop.
+ *
+ * @param metadataObject the MetadataObject for which these statistics drops
are applicable
+ * @param drops a list of PartitionStatisticsDrop objects representing the
statistics drops for
+ * the
+ * @return a new instance of MetadataObjectStatisticsDrop
+ */
+ public static MetadataObjectStatisticsDrop of(
+ MetadataObject metadataObject, List<PartitionStatisticsDrop> drops) {
+ return new MetadataObjectStatisticsDrop(metadataObject, drops);
+ }
+
+ private MetadataObjectStatisticsDrop(
+ MetadataObject metadataObject, List<PartitionStatisticsDrop> drops) {
+ this.metadataObject = metadataObject;
+ this.drops = drops;
+ }
+
+ /**
+ * Returns the MetadataObject for which these statistics drops are
applicable.
+ *
+ * @return the MetadataObject
+ */
+ public MetadataObject metadataObject() {
+ return metadataObject;
+ }
+
+ /**
+ * Returns the list of PartitionStatisticsDrop objects representing the
statistics drops for the
+ * MetadataObject.
+ *
+ * @return a list of PartitionStatisticsDrop objects
+ */
+ public List<PartitionStatisticsDrop> drops() {
+ return drops;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
new file mode 100644
index 0000000000..60872b3858
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/MetadataObjectStatisticsUpdate.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+
+/**
+ * MetadataObjectStatisticsUpdate represents a collection of statistics
updates for a specific
+ * MetadataObject.
+ */
+public class MetadataObjectStatisticsUpdate {
+
+ private final MetadataObject metadataObject;
+
+ private final List<PartitionStatisticsUpdate> partitionUpdates;
+
+ /**
+ * Creates a new instance of MetadataObjectStatisticsUpdate.
+ *
+ * @param metadataObject the MetadataObject for which these statistics
updates are applicable
+ * @param partitionUpdates a list of PartitionStatisticsUpdate objects
representing the statistics
+ * updates
+ * @return a new instance of MetadataObjectStatisticsUpdate
+ */
+ public static MetadataObjectStatisticsUpdate of(
+ MetadataObject metadataObject, List<PartitionStatisticsUpdate>
partitionUpdates) {
+ return new MetadataObjectStatisticsUpdate(metadataObject,
partitionUpdates);
+ }
+
+ private MetadataObjectStatisticsUpdate(
+ MetadataObject metadataObject, List<PartitionStatisticsUpdate>
partitionUpdates) {
+ this.metadataObject = metadataObject;
+ this.partitionUpdates = partitionUpdates;
+ }
+
+ /**
+ * Returns the MetadataObject for which these statistics updates are
applicable.
+ *
+ * @return the MetadataObject
+ */
+ public MetadataObject metadataObject() {
+ return metadataObject;
+ }
+
+ /**
+ * Returns the list of PartitionStatisticsUpdate objects representing the
statistics updates
+ *
+ * @return a list of PartitionStatisticsUpdate objects
+ */
+ public List<PartitionStatisticsUpdate> partitionUpdates() {
+ return partitionUpdates;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
new file mode 100644
index 0000000000..ca64fa48c7
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.io.Closeable;
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/** Interface for managing partition statistics in a storage system. */
+public interface PartitionStatisticStorage extends Closeable {
+
+ /**
+ * Lists statistics for a given metadata object within a specified range of
partition names.
+ * Locking guarantee: The upper layer will acquire a read lock at the
metadata object level. For
+ * example, if the metadata object is a table, the read lock of the table
level will be held.
+ *
+ * @param metalake the name of the metalake
+ * @param metadataObject the metadata object for which statistics are being
listed
+ * @param partitionRange the range of partition names for which statistics
are being listed
+ * @return a list of {@link PersistedPartitionStatistics} objects, each
containing the partition
+ * name
+ */
+ List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, PartitionRange
partitionRange);
+
+ /**
+ * Lists statistics for a given metadata object and specific partition
names. This interface may
+ * be reserved for the future use. The upper logic layer does not currently
invoke this
+ * implementation. Locking Guarantee: The upper layer will acquire a read
lock at the metadata
+ * object level. For example, if the metadata object is a table, the read
lock of the table level
+ * will be held.
+ *
+ * @param metalake the name of the metalake
+ * @param metadataObject the metadata object for which statistics are being
listed
+ * @param partitionNames a list of partition names for which statistics are
being listed
+ * @return a list of {@link PersistedPartitionStatistics} objects, each
containing the partition
+ * name
+ */
+ default List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, List<String>
partitionNames) {
+ throw new UnsupportedOperationException(
+ "Don't support listStatistics with partition names yet.");
+ }
+
+ /**
+ * Drops statistics for specified partitions of a metadata object. Locking
guarantee: The upper
+ * layer will acquire a write lock at the metadata object level. For
example, if the metadata
+ * object is a table, the write lock of the table level will be held. The
concrete implementation
+ * may perform partial drops, meaning that the underlying storage system may
not support
+ * transactional delete.
+ *
+ * @param metalake the name of the metalake
+ * @param partitionStatisticsToDrop a map where the key is a {@link
MetadataObject} and the value
+ * is a list of {@link PartitionStatisticsDrop}
+ * @return the number of statistics dropped, which may be less than the size
of the input list if
+ * some statistics do not exist or cannot be dropped.
+ */
+ int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop>
partitionStatisticsToDrop);
+
+ /**
+ * Updates statistics for a given metadata object. If the statistic exists,
it will be updated; If
+ * the statistic doesn't exist, it will be created. Locking guarantee: The
upper layer will
+ * acquire a write lock at the metadata object level. For example, if the
metadata object is a
+ * table, the write lock of the table level will be held. The concrete
implementation * may
+ * perform partial drops, meaning that the underlying storage system may not
support transactional
+ * update.
+ *
+ * @param metalake the name of the metalake
+ * @param statisticsToUpdate a list of {@link
MetadataObjectStatisticsUpdate} objects, each
+ * containing the metadata object and its associated statistics updates.
+ */
+ void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate>
statisticsToUpdate);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..42047bf3ff
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+
+/**
+ * Factory interface for creating instances of {@link
+ * org.apache.gravitino.stats.storage.PartitionStatisticStorage}.
+ */
+public interface PartitionStatisticStorageFactory {
+
+ /**
+ * Creates an instance of {@link
org.apache.gravitino.stats.storage.PartitionStatisticStorage}.
+ *
+ * @param properties additional properties for the storage configuration
+ * @return an instance of {@link
org.apache.gravitino.stats.storage.PartitionStatisticStorage}
+ */
+ PartitionStatisticStorage create(Map<String, String> properties);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
new file mode 100644
index 0000000000..0e3b647ae6
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import java.util.Map;
+import org.apache.gravitino.stats.StatisticValue;
+
+/** Represents a collection of statistics for a specific partition in a
metadata object. */
+public class PersistedPartitionStatistics {
+
+ private final String partitionName;
+ private final Map<String, StatisticValue<?>> statistics;
+
+ /**
+ * Creates an instance of {@link PersistedPartitionStatistics}.
+ *
+ * @param partitionName the name of the partition for which these statistics
are applicable
+ * @param statistics a map of statistics applicable to the partition, where
the key is the
+ * statistic name
+ * @return a new instance of {@link PersistedPartitionStatistics}
+ */
+ public static PersistedPartitionStatistics of(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ return new PersistedPartitionStatistics(partitionName, statistics);
+ }
+
+ /**
+ * Private constructor for {@link PersistedPartitionStatistics}.
+ *
+ * @param partitionName the name of the partition for which these statistics
are applicable
+ * @param statistics a map of statistics applicable to the partition, where
the key is the
+ * statistic name
+ */
+ private PersistedPartitionStatistics(
+ String partitionName, Map<String, StatisticValue<?>> statistics) {
+ this.partitionName = partitionName;
+ this.statistics = statistics;
+ }
+
+ /**
+ * Returns the name of the partition for which these statistics are
applicable.
+ *
+ * @return the name of the partition
+ */
+ public String partitionName() {
+ return partitionName;
+ }
+
+ /**
+ * Returns the statistics for the partition.
+ *
+ * @return a map of statistics applicable to the partition, where the key is
the statistic name
+ */
+ public Map<String, StatisticValue<?>> statistics() {
+ return statistics;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
new file mode 100644
index 0000000000..4e60ff4dec
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMemoryPartitionStatsStorage {
+
+ @Test
+ public void testMemoryPartitionStatsStorage() throws IOException {
+ MemoryPartitionStatsStorageFactory factory = new
MemoryPartitionStatsStorageFactory();
+ try (PartitionStatisticStorage storage =
factory.create(Maps.newHashMap())) {
+ MetadataObject metadataObject =
+ MetadataObjects.of(
+ Lists.newArrayList("catalog", "schema", "table"),
MetadataObject.Type.TABLE);
+
+ List<PersistedPartitionStatistics> stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(0, stats.size());
+
+ Map<String, StatisticValue<?>> statistics = Maps.newHashMap();
+ statistics.put("k1", StatisticValues.stringValue("v1"));
+ PartitionStatisticsUpdate update = PartitionStatisticsUpdate.of("p0",
statistics);
+ MetadataObjectStatisticsUpdate metadataObjectStatisticsUpdate =
+ MetadataObjectStatisticsUpdate.of(metadataObject,
Lists.newArrayList(update));
+ storage.updateStatistics("metalake",
Lists.newArrayList(metadataObjectStatisticsUpdate));
+
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(1, stats.size());
+ Assertions.assertEquals(stats.get(0).partitionName(), "p0");
+ Map<String, StatisticValue<?>> partitionStats =
stats.get(0).statistics();
+ Assertions.assertEquals(1, partitionStats.size());
+ Assertions.assertTrue(partitionStats.containsKey("k1"));
+ StatisticValue<?> value = partitionStats.get("k1");
+ Assertions.assertEquals(StatisticValues.stringValue("v1"), value);
+
+ PartitionStatisticsDrop drop = PartitionStatisticsDrop.of("p0",
Lists.newArrayList("k1"));
+ List<PartitionStatisticsDrop> drops = Lists.newArrayList(drop);
+
+ List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
+ Lists.newArrayList(MetadataObjectStatisticsDrop.of(metadataObject,
drops));
+ storage.dropStatistics("metalake", partitionStatisticsToDrop);
+
+ stats =
+ storage.listStatistics(
+ "metalake",
+ metadataObject,
+ PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
+ Assertions.assertEquals(0, stats.size());
+ }
+ }
+}