This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new e7849f284d [#7825] feat(core, server): Add partition statistic manager 
and partition REST API (#7876)
e7849f284d is described below

commit e7849f284d54135368247d9897c3efb35e18c478
Author: roryqi <[email protected]>
AuthorDate: Wed Aug 20 14:05:30 2025 +0800

    [#7825] feat(core, server): Add partition statistic manager and partition 
REST API (#7876)
    
    ### What changes were proposed in this pull request?
    
    1. Extract PartitionStatisticsUpdate and PartitionStatisticsDrop
    interfaces
    2. Add partition statistic manager
    3. Add partition REST API
    
    ### Why are the changes needed?
    
    Fix: #7825
    
    ### Does this PR introduce _any_ user-facing change?
    
    I will add the document later.
    
    ### How was this patch tested?
    
    Add UT.
---
 .../exceptions/IllegalStatisticNameException.java  |   6 +-
 .../gravitino/stats/PartitionStatistics.java       |   2 +-
 .../gravitino/stats/PartitionStatisticsDrop.java   |  31 +-
 .../stats/PartitionStatisticsModification.java     | 122 ++++++++
 .../gravitino/stats/PartitionStatisticsUpdate.java |  31 +-
 .../requests/PartitionStatisticsDropRequest.java   |  66 ++++
 .../requests/PartitionStatisticsUpdateRequest.java |  66 ++++
 .../responses/PartitionStatisticsListResponse.java |  69 +++++
 .../dto/stats/PartitionStatisticsDTO.java          |  85 ++++++
 .../dto/stats/PartitionStatisticsDropDTO.java      |  89 ++++++
 .../dto/stats/PartitionStatisticsUpdateDTO.java    |  91 ++++++
 .../dto/stats/TestPartitionStatisticsDTO.java      |  57 ++++
 .../main/java/org/apache/gravitino/Configs.java    |   9 +
 .../java/org/apache/gravitino/GravitinoEnv.java    |  10 +-
 .../apache/gravitino/stats/StatisticManager.java   | 154 +++++++++-
 .../MemoryPartitionStatsStorageFactory.java        |  71 ++++-
 .../stats/storage/PartitionStatisticStorage.java   |  17 +-
 .../storage/PersistedPartitionStatistics.java      |  25 +-
 .../stats/storage/PersistedStatistic.java          |  82 +++++
 .../gravitino/stats/TestStatisticManager.java      | 108 ++++++-
 .../storage/TestMemoryPartitionStatsStorage.java   |  76 ++++-
 .../server/web/rest/ExceptionHandlers.java         |  34 +++
 .../server/web/rest/StatisticOperations.java       | 256 ++++++++++++++++
 .../server/web/rest/TestStatisticOperations.java   | 340 ++++++++++++++++++++-
 24 files changed, 1786 insertions(+), 111 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
index ee98b84036..84cbf590f8 100644
--- 
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalStatisticNameException.java
@@ -22,7 +22,7 @@ import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 
 /** An exception thrown when statistic has an illegal name */
-public class IllegalStatisticNameException extends GravitinoRuntimeException {
+public class IllegalStatisticNameException extends IllegalArgumentException {
   /**
    * Constructs a new exception with the specified detail message.
    *
@@ -31,7 +31,7 @@ public class IllegalStatisticNameException extends 
GravitinoRuntimeException {
    */
   @FormatMethod
   public IllegalStatisticNameException(@FormatString String message, Object... 
args) {
-    super(message, args);
+    super(String.format(message, args));
   }
 
   /**
@@ -44,6 +44,6 @@ public class IllegalStatisticNameException extends 
GravitinoRuntimeException {
   @FormatMethod
   public IllegalStatisticNameException(
       Throwable cause, @FormatString String message, Object... args) {
-    super(cause, message, args);
+    super(String.format(message, args), cause);
   }
 }
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
index f44be6c953..356d9e5b4a 100644
--- a/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
+++ b/api/src/main/java/org/apache/gravitino/stats/PartitionStatistics.java
@@ -26,7 +26,7 @@ public interface PartitionStatistics {
    *
    * @return the name of the partition
    */
-  String name();
+  String partitionName();
 
   /**
    * Returns the statistics for the partition.
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
index 4e4cc1b170..c9acfe35db 100644
--- a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
+++ b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsDrop.java
@@ -25,44 +25,19 @@ import java.util.List;
  * source. It is used to manage and track the statistics that are relevant 
when partitions are
  * dropped.
  */
-public class PartitionStatisticsDrop {
-
-  private final String partitionName;
-  private final List<String> statisticNames;
-
-  /**
-   * Creates a PartitionDropStatistics instance with the specified partition 
name and statistic
-   * names.
-   *
-   * @param partitionName the name of the partition.
-   * @param statisticNames a list of statistic names that are relevant to the 
partition being
-   *     dropped.
-   * @return a PartitionDropStatistics instance
-   */
-  public static PartitionStatisticsDrop of(String partitionName, List<String> 
statisticNames) {
-    return new PartitionStatisticsDrop(partitionName, statisticNames);
-  }
-
-  private PartitionStatisticsDrop(String partitionName, List<String> 
statisticNames) {
-    this.partitionName = partitionName;
-    this.statisticNames = statisticNames;
-  }
+public interface PartitionStatisticsDrop {
 
   /**
    * Returns the name of the partition for which these statistics are 
applicable.
    *
    * @return the name of the partition
    */
-  public String partitionName() {
-    return partitionName;
-  }
+  String partitionName();
 
   /**
    * Returns the names of the statistics that are relevant to the partition 
being dropped.
    *
    * @return a list of statistic names
    */
-  public List<String> statisticNames() {
-    return statisticNames;
-  }
+  List<String> statisticNames();
 }
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
new file mode 100644
index 0000000000..ed63374c70
--- /dev/null
+++ 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsModification.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PartitionStatisticsModification provides methods to create instances of 
PartitionStatisticsDrop
+ * and PartitionStatisticsUpdate, which are used to manage statistics related 
to partitions in a
+ * data source.
+ */
+public class PartitionStatisticsModification {
+
+  /**
+   * Creates a PartitionDropStatistics instance with the specified partition 
name and statistic
+   * names.
+   *
+   * @param partitionName the name of the partition.
+   * @param statisticNames a list of statistic names that are relevant to the 
partition being
+   *     dropped.
+   * @return a PartitionDropStatistics instance
+   */
+  public static PartitionStatisticsDrop drop(String partitionName, 
List<String> statisticNames) {
+    return new PartitionStatisticsDropImpl(partitionName, statisticNames);
+  }
+
+  /**
+   * Creates a PartitionUpdateStatistics instance with the specified partition 
name and statistics.
+   *
+   * @param partitionName the name of the partition
+   * @param statistics a map of statistic names to their values to be updated
+   * @return a PartitionUpdateStatistics instance
+   */
+  public static PartitionStatisticsUpdate update(
+      String partitionName, Map<String, StatisticValue<?>> statistics) {
+    return new PartitionStatisticsUpdateImpl(partitionName, statistics);
+  }
+
+  private static class PartitionStatisticsDropImpl implements 
PartitionStatisticsDrop {
+
+    private final String partitionName;
+    private final List<String> statisticNames;
+
+    private PartitionStatisticsDropImpl(String partitionName, List<String> 
statisticNames) {
+      this.partitionName = partitionName;
+      this.statisticNames = statisticNames;
+    }
+
+    /**
+     * Returns the name of the partition for which these statistics are 
applicable.
+     *
+     * @return the name of the partition
+     */
+    public String partitionName() {
+      return partitionName;
+    }
+
+    /**
+     * Returns the names of the statistics that are relevant to the partition 
being dropped.
+     *
+     * @return a list of statistic names
+     */
+    public List<String> statisticNames() {
+      return statisticNames;
+    }
+  }
+
+  private static class PartitionStatisticsUpdateImpl implements 
PartitionStatisticsUpdate {
+
+    private final String partitionName;
+    private final Map<String, StatisticValue<?>> statistics;
+
+    /**
+     * Creates a PartitionUpdateStatistics instance with the specified 
partition name and
+     * statistics.
+     *
+     * @param partitionName the name of the partition
+     * @param statistics a map of statistic names to their values to be updated
+     * @return a PartitionUpdateStatistics instance
+     */
+    private PartitionStatisticsUpdateImpl(
+        String partitionName, Map<String, StatisticValue<?>> statistics) {
+      this.partitionName = partitionName;
+      this.statistics = statistics;
+    }
+
+    /**
+     * Returns the name of the partition for which these statistics are 
applicable.
+     *
+     * @return the name of the partition.
+     */
+    public String partitionName() {
+      return partitionName;
+    }
+
+    /**
+     * Returns the statistics to be updated for the partition.
+     *
+     * @return a map where the key is the statistic name and the value is the 
statistic value.
+     */
+    public Map<String, StatisticValue<?>> statistics() {
+      return statistics;
+    }
+  }
+}
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
index 7a8f493506..df47af2f5b 100644
--- 
a/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
+++ 
b/api/src/main/java/org/apache/gravitino/stats/PartitionStatisticsUpdate.java
@@ -24,44 +24,19 @@ import java.util.Map;
  * PartitionUpdateStatistics represents the statistics for a specific 
partition that can be updated.
  * It contains the partition name and a map of statistic names to their values.
  */
-public class PartitionStatisticsUpdate {
-
-  private final String partitionName;
-  private final Map<String, StatisticValue<?>> statistics;
-
-  /**
-   * Creates a PartitionUpdateStatistics instance with the specified partition 
name and statistics.
-   *
-   * @param partitionName the name of the partition
-   * @param statistics a map of statistic names to their values to be updated
-   * @return a PartitionUpdateStatistics instance
-   */
-  public static PartitionStatisticsUpdate of(
-      String partitionName, Map<String, StatisticValue<?>> statistics) {
-    return new PartitionStatisticsUpdate(partitionName, statistics);
-  }
-
-  private PartitionStatisticsUpdate(
-      String partitionName, Map<String, StatisticValue<?>> statistics) {
-    this.partitionName = partitionName;
-    this.statistics = statistics;
-  }
+public interface PartitionStatisticsUpdate {
 
   /**
    * Returns the name of the partition for which these statistics are 
applicable.
    *
    * @return the name of the partition.
    */
-  public String partitionName() {
-    return partitionName;
-  }
+  String partitionName();
 
   /**
    * Returns the statistics to be updated for the partition.
    *
    * @return a map where the key is the statistic name and the value is the 
statistic value.
    */
-  public Map<String, StatisticValue<?>> statistics() {
-    return statistics;
-  }
+  Map<String, StatisticValue<?>> statistics();
 }
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
new file mode 100644
index 0000000000..3a571242e7
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsDropRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Request to drop partition statistics. */
+@Getter
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDropRequest implements RESTRequest {
+
+  @JsonProperty("drops")
+  private List<PartitionStatisticsDropDTO> drops;
+
+  /**
+   * Creates a new PartitionStatsDropRequest.
+   *
+   * @param drops The partition statistics to drop.
+   */
+  public PartitionStatisticsDropRequest(List<PartitionStatisticsDropDTO> 
drops) {
+    this.drops = drops;
+  }
+
+  /** This is the constructor that is used by Jackson deserializer */
+  public PartitionStatisticsDropRequest() {
+    this(null);
+  }
+
+  /**
+   * Validates the request.
+   *
+   * @throws IllegalArgumentException If the request is invalid, this 
exception is thrown.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        drops != null && !drops.isEmpty(), "\"drops\" must not be null or 
empty.");
+    for (PartitionStatisticsDropDTO drop : drops) {
+      drop.validate();
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
new file mode 100644
index 0000000000..5ef9d92f4d
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/PartitionStatisticsUpdateRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Represents a request to update partition statistics. */
+@Getter
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsUpdateRequest implements RESTRequest {
+
+  @JsonProperty("updates")
+  private final List<PartitionStatisticsUpdateDTO> updates;
+
+  /**
+   * Creates a new PartitionStatsUpdateRequest.
+   *
+   * @param updates The updates to apply to the partition statistics.
+   */
+  public PartitionStatisticsUpdateRequest(List<PartitionStatisticsUpdateDTO> 
updates) {
+    this.updates = updates;
+  }
+
+  /** Default constructor for PartitionStatsUpdateRequest. (Used for Jackson 
deserialization.) */
+  public PartitionStatisticsUpdateRequest() {
+    this(null);
+  }
+
+  /**
+   * Validates the request.
+   *
+   * @throws IllegalArgumentException If the request is invalid, this 
exception is thrown.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        updates != null && !updates.isEmpty(), "\"updates\" must not be null 
or empty.");
+    for (PartitionStatisticsUpdateDTO update : updates) {
+      update.validate();
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
 
b/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
new file mode 100644
index 0000000000..d22240bf0c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/PartitionStatisticsListResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDTO;
+
+/**
+ * PartitionStatisticsListResponse is a response object that contains an array 
of
+ * PartitionStatisticsDTO.
+ */
+@Getter
+@ToString
+@EqualsAndHashCode(callSuper = true)
+public class PartitionStatisticsListResponse extends BaseResponse {
+
+  @JsonProperty("partitionStatistics")
+  private PartitionStatisticsDTO[] partitionStatistics;
+
+  /**
+   * Creates a new PartitionStatsListResponse.
+   *
+   * @param partitionStatistics The updated statistics for the partition.
+   */
+  public PartitionStatisticsListResponse(PartitionStatisticsDTO[] 
partitionStatistics) {
+    super(0);
+    this.partitionStatistics = partitionStatistics;
+  }
+
+  /** This is the constructor that is used by Jackson deserializer */
+  public PartitionStatisticsListResponse() {
+    this(null);
+  }
+
+  /**
+   * Validates the response.
+   *
+   * @throws IllegalArgumentException If the response is invalid, this 
exception is thrown.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+    Preconditions.checkArgument(
+        partitionStatistics != null, "\"partitionStatistics\" must not be 
null");
+    for (PartitionStatisticsDTO partitionStat : partitionStatistics) {
+      partitionStat.validate();
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
new file mode 100644
index 0000000000..3b48f3df3b
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDTO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.Statistic;
+
+/**
+ * PartitionStatisticsDTO is a Data Transfer Object (DTO) that represents the 
statistics for a
+ * specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDTO implements PartitionStatistics {
+
+  @JsonProperty("partitionName")
+  private String partitionName;
+
+  @JsonProperty("statistics")
+  private StatisticDTO[] statistics;
+
+  /** Default constructor for Jackson. */
+  protected PartitionStatisticsDTO() {
+    this(null, null);
+  }
+
+  private PartitionStatisticsDTO(String partitionName, StatisticDTO[] 
statistics) {
+    this.partitionName = partitionName;
+    this.statistics = statistics;
+  }
+
+  @Override
+  public String partitionName() {
+    return partitionName;
+  }
+
+  @Override
+  public Statistic[] statistics() {
+    return statistics;
+  }
+
+  /** Validates the PartitionStatisticsDTO instance. */
+  public void validate() {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be 
null or empty");
+    Preconditions.checkArgument(statistics != null, "\"statistics\" must not 
be null");
+    for (StatisticDTO statistic : statistics) {
+      statistic.validate();
+    }
+  }
+
+  /**
+   * Creates a new instance of PartitionStatisticsDTO.
+   *
+   * @param partitionName the name of the partition for which these statistics 
are applicable
+   * @param statistics the statistics applicable to the partition
+   * @return a new instance of PartitionStatisticsDTO
+   */
+  public static PartitionStatisticsDTO of(String partitionName, StatisticDTO[] 
statistics) {
+    PartitionStatisticsDTO dto = new PartitionStatisticsDTO(partitionName, 
statistics);
+    dto.validate();
+    return dto;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
new file mode 100644
index 0000000000..8004d8d7c4
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsDropDTO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+
+/**
+ * PartitionStatisticsDropDTO is a Data Transfer Object (DTO) that represents 
the request to drop
+ * statistics for a specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsDropDTO implements PartitionStatisticsDrop {
+
+  @JsonProperty("partitionName")
+  private final String partitionName;
+
+  @JsonProperty("statisticNames")
+  private final List<String> statisticNames;
+
+  /** Default constructor for Jackson. */
+  protected PartitionStatisticsDropDTO() {
+    this(null, null);
+  }
+
+  private PartitionStatisticsDropDTO(String partitionName, List<String> 
statisticNames) {
+    this.partitionName = partitionName;
+    this.statisticNames = statisticNames;
+  }
+
+  @Override
+  public String partitionName() {
+    return partitionName;
+  }
+
+  @Override
+  public List<String> statisticNames() {
+    return statisticNames;
+  }
+
+  /** Validates the PartitionStatisticsDropDTO instance. */
+  public void validate() {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be 
null or empty");
+    Preconditions.checkArgument(
+        statisticNames != null && !statisticNames.isEmpty(),
+        "\"statisticNames\" must not be null or empty");
+    for (String statisticName : statisticNames) {
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(statisticName),
+          "Each statistic \"name\" in \"statisticNames\" must not be null or 
empty");
+    }
+  }
+
+  /**
+   * Creates a new instance of PartitionStatisticsDropDTO.
+   *
+   * @param partitionName the name of the partition for which these statistics 
are applicable
+   * @param statisticNames the names of the statistics to drop for the 
partition
+   * @return a new instance of PartitionStatisticsDropDTO
+   */
+  public static PartitionStatisticsDropDTO of(String partitionName, 
List<String> statisticNames) {
+    PartitionStatisticsDropDTO dto = new 
PartitionStatisticsDropDTO(partitionName, statisticNames);
+    dto.validate();
+    return dto;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
new file mode 100644
index 0000000000..edaf035e28
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/stats/PartitionStatisticsUpdateDTO.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+
+/**
+ * PartitionStatisticsUpdateDTO is a Data Transfer Object (DTO) that 
represents the request to
+ * update statistics for a specific partition in a data source.
+ */
+@EqualsAndHashCode
+@ToString
+public class PartitionStatisticsUpdateDTO implements PartitionStatisticsUpdate 
{
+
+  @JsonProperty("partitionName")
+  private final String partitionName;
+
+  @JsonProperty("statistics")
+  @JsonSerialize(contentUsing = JsonUtils.StatisticValueSerializer.class)
+  @JsonDeserialize(contentUsing = JsonUtils.StatisticValueDeserializer.class)
+  private final Map<String, StatisticValue<?>> statistics;
+
+  /** Default constructor for Jackson. */
+  protected PartitionStatisticsUpdateDTO() {
+    this(null, null);
+  }
+
+  private PartitionStatisticsUpdateDTO(
+      String partitionName, Map<String, StatisticValue<?>> statistics) {
+    this.partitionName = partitionName;
+    this.statistics = statistics;
+  }
+
+  @Override
+  public String partitionName() {
+    return partitionName;
+  }
+
+  @Override
+  public Map<String, StatisticValue<?>> statistics() {
+    return statistics;
+  }
+
+  /** Validates the PartitionStatisticsUpdateDTO instance. */
+  public void validate() {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(partitionName), "\"partitionName\" must not be 
null or empty");
+    Preconditions.checkArgument(
+        statistics != null && !statistics.isEmpty(), "\"statistics\" must not 
be null or empty");
+  }
+
+  /**
+   * Creates a new instance of PartitionStatisticsUpdateDTO.
+   *
+   * @param partitionName the name of the partition for which these statistics 
are applicable
+   * @param statistics the statistics applicable to the partition
+   * @return a new instance of PartitionStatisticsUpdateDTO
+   */
+  public static PartitionStatisticsUpdateDTO of(
+      String partitionName, Map<String, StatisticValue<?>> statistics) {
+    PartitionStatisticsUpdateDTO dto = new 
PartitionStatisticsUpdateDTO(partitionName, statistics);
+    dto.validate();
+    return dto;
+  }
+}
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
 
b/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
new file mode 100644
index 0000000000..ef770cb640
--- /dev/null
+++ 
b/common/src/test/java/org/apache/gravitino/dto/stats/TestPartitionStatisticsDTO.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.stats;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPartitionStatisticsDTO {
+
+  @Test
+  public void testPartitionStatisticsDTO() throws JsonProcessingException {
+    StatisticDTO statisticDTO =
+        StatisticDTO.builder()
+            .withName("statistic_test")
+            .withValue(Optional.of(StatisticValues.longValue(100L)))
+            .withReserved(false)
+            .withModifiable(false)
+            .withAudit(
+                AuditDTO.builder()
+                    .withCreator("test_user")
+                    .withCreateTime(Instant.now())
+                    .withLastModifier("test_user")
+                    .withLastModifiedTime(Instant.now())
+                    .build())
+            .build();
+
+    PartitionStatisticsDTO partitionStatisticsDTO =
+        PartitionStatisticsDTO.of("test_partition", new StatisticDTO[] 
{statisticDTO});
+
+    String serJson = 
JsonUtils.objectMapper().writeValueAsString(partitionStatisticsDTO);
+    PartitionStatisticsDTO deserPartitionStatisticsDTO =
+        JsonUtils.objectMapper().readValue(serJson, 
PartitionStatisticsDTO.class);
+    Assertions.assertEquals(partitionStatisticsDTO, 
deserPartitionStatisticsDTO);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 829f6330bf..63681fec1d 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,6 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
+import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
 
 public class Configs {
 
@@ -445,4 +446,12 @@ public class Configs {
           .longConf()
           .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
           .createWithDefault(5 * 60 * 1000L); // Default is 5 minutes
+
+  // TODO: Change default value to a Lance partition statistics storage 
factory class
+  public static final ConfigEntry<String> 
PARTITION_STATS_STORAGE_FACTORY_CLASS =
+      new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
+          .doc("The partition stats storage factory class.")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          
.createWithDefault(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
 }
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 591542a36d..7b5929d5e5 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -472,6 +472,14 @@ public class GravitinoEnv {
       }
     }
 
+    if (statisticManager != null) {
+      try {
+        statisticManager.close();
+      } catch (Exception e) {
+        LOG.warn("Failed to close StatisticManager", e);
+      }
+    }
+
     LOG.info("Gravitino Environment is shut down.");
   }
 
@@ -563,7 +571,7 @@ public class GravitinoEnv {
     ModelNormalizeDispatcher modelNormalizeDispatcher =
         new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
     this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
-    this.statisticManager = new StatisticManager(entityStore, idGenerator);
+    this.statisticManager = new StatisticManager(entityStore, idGenerator, 
config);
 
     // Create and initialize access control related modules
     boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java 
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index b62527dca6..44d1a80b76 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.stats;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.io.Closeable;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
@@ -28,6 +29,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Audit;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.MetadataObject;
@@ -40,6 +43,11 @@ import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.stats.storage.MetadataObjectStatisticsDrop;
+import org.apache.gravitino.stats.storage.MetadataObjectStatisticsUpdate;
+import org.apache.gravitino.stats.storage.PartitionStatisticStorage;
+import org.apache.gravitino.stats.storage.PartitionStatisticStorageFactory;
+import org.apache.gravitino.stats.storage.PersistedPartitionStatistics;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.Executable;
 import org.apache.gravitino.utils.MetadataObjectUtil;
@@ -48,17 +56,34 @@ import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StatisticManager {
+public class StatisticManager implements Closeable {
 
+  private static final String OPTIONS_PREFIX = 
"gravitino.stats.partition.option.";
   private static final Logger LOG = 
LoggerFactory.getLogger(StatisticManager.class);
 
   private final EntityStore store;
 
   private final IdGenerator idGenerator;
+  private final PartitionStatisticStorage partitionStorage;
 
-  public StatisticManager(EntityStore store, IdGenerator idGenerator) {
+  public StatisticManager(EntityStore store, IdGenerator idGenerator, Config 
config) {
     this.store = store;
     this.idGenerator = idGenerator;
+    String className = 
config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS);
+    Map<String, String> options = config.getConfigsWithPrefix(OPTIONS_PREFIX);
+    try {
+      PartitionStatisticStorageFactory factory =
+          (PartitionStatisticStorageFactory)
+              Class.forName(className).getDeclaredConstructor().newInstance();
+      this.partitionStorage = factory.create(options);
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to create and initialize partition statistics storage 
factory by name {}.",
+          className,
+          e);
+      throw new RuntimeException(
+          "Failed to create and initialize partition statistics storage 
factory: " + className, e);
+    }
   }
 
   public List<Statistic> listStatistics(String metalake, MetadataObject 
metadataObject) {
@@ -182,9 +207,110 @@ public class StatisticManager {
     }
   }
 
+  public boolean dropPartitionStatistics(
+      String metalake,
+      MetadataObject metadataObject,
+      List<PartitionStatisticsDrop> partitionStatistics) {
+    try {
+      List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
+          Lists.newArrayList(MetadataObjectStatisticsDrop.of(metadataObject, 
partitionStatistics));
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+
+      return TreeLockUtils.doWithTreeLock(
+              identifier,
+              LockType.WRITE,
+              () -> partitionStorage.dropStatistics(metalake, 
partitionStatisticsToDrop))
+          != 0;
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to drop partition statistics for {} in metalake {}.",
+          metadataObject,
+          metalake,
+          ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public void updatePartitionStatistics(
+      String metalake,
+      MetadataObject metadataObject,
+      List<PartitionStatisticsUpdate> partitionStatistics) {
+    try {
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+
+      List<MetadataObjectStatisticsUpdate> statisticsToUpdate = 
Lists.newArrayList();
+      statisticsToUpdate.add(
+          MetadataObjectStatisticsUpdate.of(metadataObject, 
partitionStatistics));
+      TreeLockUtils.doWithTreeLock(
+          identifier,
+          LockType.WRITE,
+          (Executable<Void, IOException>)
+              () -> {
+                partitionStorage.updateStatistics(metalake, 
statisticsToUpdate);
+                return null;
+              });
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to update partition statistics for {} in metalake {}.",
+          metadataObject,
+          metalake,
+          ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public List<PartitionStatistics> listPartitionStatistics(
+      String metalake, MetadataObject metadataObject, PartitionRange range) {
+    try {
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+
+      List<PersistedPartitionStatistics> partitionStats =
+          TreeLockUtils.doWithTreeLock(
+              identifier,
+              LockType.READ,
+              () -> partitionStorage.listStatistics(metalake, metadataObject, 
range));
+
+      List<PartitionStatistics> listedStats = Lists.newArrayList();
+
+      if (partitionStats == null || partitionStats.isEmpty()) {
+        return listedStats;
+      }
+
+      return partitionStats.stream()
+          .map(
+              partitionStat -> {
+                String partitionName = partitionStat.partitionName();
+                Statistic[] statistics =
+                    partitionStat.statistics().stream()
+                        .map(
+                            entry -> {
+                              String statName = entry.name();
+                              StatisticValue<?> value = entry.value();
+                              AuditInfo auditInfo = entry.auditInfo();
+                              return new CustomStatistic(statName, value, 
auditInfo);
+                            })
+                        .toArray(Statistic[]::new);
+
+                return new CustomPartitionStatistic(partitionName, statistics);
+              })
+          .collect(Collectors.toList());
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to list partition statistics for {} in metalake {}.",
+          metadataObject,
+          metalake,
+          ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    partitionStorage.close();
+  }
+
   @VisibleForTesting
   public static class CustomStatistic implements Statistic {
-
     private final String name;
     private final StatisticValue<?> value;
     private final Audit auditInfo;
@@ -220,4 +346,26 @@ public class StatisticManager {
       return auditInfo;
     }
   }
+
+  @VisibleForTesting
+  public static class CustomPartitionStatistic implements PartitionStatistics {
+
+    private final String partitionName;
+    private final Statistic[] statistics;
+
+    public CustomPartitionStatistic(String partitionName, Statistic[] 
statistics) {
+      this.partitionName = partitionName;
+      this.statistics = statistics;
+    }
+
+    @Override
+    public String partitionName() {
+      return partitionName;
+    }
+
+    @Override
+    public Statistic[] statistics() {
+      return statistics;
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
index 97f1297385..b3684118f4 100644
--- 
a/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/MemoryPartitionStatsStorageFactory.java
@@ -20,18 +20,22 @@ package org.apache.gravitino.stats.storage;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.HashMap;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.stats.PartitionRange;
 import org.apache.gravitino.stats.PartitionStatisticsDrop;
 import org.apache.gravitino.stats.PartitionStatisticsUpdate;
 import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +67,7 @@ public class MemoryPartitionStatsStorageFactory implements 
PartitionStatisticSto
         return Lists.newArrayList();
       }
 
-      Map<String, Map<String, StatisticValue<?>>> resultStats = 
Maps.newHashMap();
+      Map<String, List<PersistedStatistic>> resultStats = Maps.newHashMap();
       for (PersistedPartitionStatistics partitionStat : 
tableStats.partitionStatistics().values()) {
         String partitionName = partitionStat.partitionName();
         boolean lowerBoundSatisfied =
@@ -81,7 +85,7 @@ public class MemoryPartitionStatsStorageFactory implements 
PartitionStatisticSto
                 BoundDirection.UPPER);
 
         if (lowerBoundSatisfied && upperBoundSatisfied) {
-          resultStats.put(partitionName, 
Maps.newHashMap(partitionStat.statistics()));
+          resultStats.put(partitionName, 
Lists.newArrayList(partitionStat.statistics()));
         }
       }
       return resultStats.entrySet().stream()
@@ -121,11 +125,45 @@ public class MemoryPartitionStatsStorageFactory 
implements PartitionStatisticSto
                   .partitionStatistics()
                   .computeIfAbsent(
                       partitionName,
-                      k -> PersistedPartitionStatistics.of(partitionName, new 
HashMap<>()));
+                      k -> PersistedPartitionStatistics.of(partitionName, 
Lists.newArrayList()));
+
+          Set<String> existedStats = Sets.newHashSet();
+          // Update existed stats
+          existedPartitionStats
+              .statistics()
+              .replaceAll(
+                  stat -> {
+                    String statName = stat.name();
+                    if (partitionStats.containsKey(statName)) {
+                      existedStats.add(statName);
+                      StatisticValue<?> newValue = 
partitionStats.get(statName);
+                      AuditInfo auditInfo =
+                          AuditInfo.builder()
+                              .withCreator(stat.auditInfo().creator())
+                              .withCreateTime(stat.auditInfo().createTime())
+                              .withLastModifiedTime(Instant.now())
+                              
.withLastModifier(PrincipalUtils.getCurrentUserName())
+                              .build();
+                      return PersistedStatistic.of(statName, newValue, 
auditInfo);
+                    }
+                    return stat;
+                  });
+
+          // Add new stats
           for (Map.Entry<String, StatisticValue<?>> statEntry : 
partitionStats.entrySet()) {
             String statName = statEntry.getKey();
-            StatisticValue<?> statValue = statEntry.getValue();
-            existedPartitionStats.statistics().put(statName, statValue);
+            if (!existedStats.contains(statName)) {
+              AuditInfo auditInfo =
+                  AuditInfo.builder()
+                      .withCreator(PrincipalUtils.getCurrentUserName())
+                      .withCreateTime(Instant.now())
+                      .withLastModifiedTime(Instant.now())
+                      .withLastModifier(PrincipalUtils.getCurrentUserName())
+                      .build();
+              PersistedStatistic newStat =
+                  PersistedStatistic.of(statName, statEntry.getValue(), 
auditInfo);
+              existedPartitionStats.statistics().add(newStat);
+            }
           }
         }
       }
@@ -153,14 +191,15 @@ public class MemoryPartitionStatsStorageFactory 
implements PartitionStatisticSto
           if 
(tableStats.partitionStatistics().containsKey(partStats.partitionName())) {
             PersistedPartitionStatistics persistedPartitionStatistics =
                 
tableStats.partitionStatistics().get(partStats.partitionName());
-            for (String statName : partStats.statisticNames()) {
-              Map<String, StatisticValue<?>> statisticValueMap =
-                  persistedPartitionStatistics.statistics();
-              if (statisticValueMap.containsKey(statName)) {
-                statisticValueMap.remove(statName);
-                deleteCount++;
-              }
-            }
+            Set<String> statsNamesToDelete = 
Sets.newHashSet(partStats.statisticNames());
+            int originCount = persistedPartitionStatistics.statistics().size();
+            persistedPartitionStatistics
+                .statistics()
+                .removeIf(
+                    persistedStatistic -> 
statsNamesToDelete.contains(persistedStatistic.name()));
+
+            deleteCount =
+                deleteCount + (originCount - 
persistedPartitionStatistics.statistics().size());
             if (persistedPartitionStatistics.statistics().isEmpty()) {
               
tableStats.partitionStatistics().remove(partStats.partitionName());
             }
@@ -223,7 +262,7 @@ public class MemoryPartitionStatsStorageFactory implements 
PartitionStatisticSto
         boolean compare(
             String targetPartitionName, String partitionName, 
PartitionRange.BoundType type) {
           int result = targetPartitionName.compareTo(partitionName);
-          return type == PartitionRange.BoundType.OPEN ? result > 0 : result 
>= 0;
+          return type == PartitionRange.BoundType.OPEN ? result < 0 : result 
<= 0;
         }
       },
       UPPER {
@@ -231,7 +270,7 @@ public class MemoryPartitionStatsStorageFactory implements 
PartitionStatisticSto
         boolean compare(
             String targetPartitionName, String partitionName, 
PartitionRange.BoundType type) {
           int result = targetPartitionName.compareTo(partitionName);
-          return type == PartitionRange.BoundType.OPEN ? result < 0 : result 
<= 0;
+          return type == PartitionRange.BoundType.OPEN ? result > 0 : result 
>= 0;
         }
       };
 
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
index ca64fa48c7..64acf8c2fe 100644
--- 
a/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PartitionStatisticStorage.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.stats.storage;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.stats.PartitionRange;
@@ -39,7 +40,8 @@ public interface PartitionStatisticStorage extends Closeable {
    *     name
    */
   List<PersistedPartitionStatistics> listStatistics(
-      String metalake, MetadataObject metadataObject, PartitionRange 
partitionRange);
+      String metalake, MetadataObject metadataObject, PartitionRange 
partitionRange)
+      throws IOException;
 
   /**
    * Lists statistics for a given metadata object and specific partition 
names. This interface may
@@ -55,7 +57,8 @@ public interface PartitionStatisticStorage extends Closeable {
    *     name
    */
   default List<PersistedPartitionStatistics> listStatistics(
-      String metalake, MetadataObject metadataObject, List<String> 
partitionNames) {
+      String metalake, MetadataObject metadataObject, List<String> 
partitionNames)
+      throws IOException {
     throw new UnsupportedOperationException(
         "Don't support listStatistics with partition names yet.");
   }
@@ -73,19 +76,21 @@ public interface PartitionStatisticStorage extends 
Closeable {
    * @return the number of statistics dropped, which may be less than the size 
of the input list if
    *     some statistics do not exist or cannot be dropped.
    */
-  int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop> 
partitionStatisticsToDrop);
+  int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop> 
partitionStatisticsToDrop)
+      throws IOException;
 
   /**
    * Updates statistics for a given metadata object. If the statistic exists, 
it will be updated; If
    * the statistic doesn't exist, it will be created. Locking guarantee: The 
upper layer will
    * acquire a write lock at the metadata object level. For example, if the 
metadata object is a
-   * table, the write lock of the table level will be held. The concrete 
implementation * may
-   * perform partial drops, meaning that the underlying storage system may not 
support transactional
+   * table, the write lock of the table level will be held. The concrete 
implementation may perform
+   * partial updates, meaning that the underlying storage system may not 
support transactional
    * update.
    *
    * @param metalake the name of the metalake
    * @param statisticsToUpdate a list of {@link 
MetadataObjectStatisticsUpdate} objects, each
    *     containing the metadata object and its associated statistics updates.
    */
-  void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate> 
statisticsToUpdate);
+  void updateStatistics(String metalake, List<MetadataObjectStatisticsUpdate> 
statisticsToUpdate)
+      throws IOException;
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
index 0e3b647ae6..241ec6084e 100644
--- 
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedPartitionStatistics.java
@@ -18,25 +18,24 @@
  */
 package org.apache.gravitino.stats.storage;
 
-import java.util.Map;
-import org.apache.gravitino.stats.StatisticValue;
+import java.util.List;
 
 /** Represents a collection of statistics for a specific partition in a 
metadata object. */
 public class PersistedPartitionStatistics {
 
   private final String partitionName;
-  private final Map<String, StatisticValue<?>> statistics;
+  private final List<PersistedStatistic> statistics;
 
   /**
    * Creates an instance of {@link PersistedPartitionStatistics}.
    *
    * @param partitionName the name of the partition for which these statistics 
are applicable
-   * @param statistics a map of statistics applicable to the partition, where 
the key is the
-   *     statistic name
+   * @param statistics a list of statistics applicable to the partition, where 
each statistic is
+   *     represented by a {@link PersistedStatistic} object
    * @return a new instance of {@link PersistedPartitionStatistics}
    */
   public static PersistedPartitionStatistics of(
-      String partitionName, Map<String, StatisticValue<?>> statistics) {
+      String partitionName, List<PersistedStatistic> statistics) {
     return new PersistedPartitionStatistics(partitionName, statistics);
   }
 
@@ -44,11 +43,10 @@ public class PersistedPartitionStatistics {
    * Private constructor for {@link PersistedPartitionStatistics}.
    *
    * @param partitionName the name of the partition for which these statistics 
are applicable
-   * @param statistics a map of statistics applicable to the partition, where 
the key is the
-   *     statistic name
+   * @param statistics a list of statistics applicable to the partition, where 
each statistic is
+   *     represented by a {@link PersistedStatistic} object
    */
-  private PersistedPartitionStatistics(
-      String partitionName, Map<String, StatisticValue<?>> statistics) {
+  private PersistedPartitionStatistics(String partitionName, 
List<PersistedStatistic> statistics) {
     this.partitionName = partitionName;
     this.statistics = statistics;
   }
@@ -63,11 +61,12 @@ public class PersistedPartitionStatistics {
   }
 
   /**
-   * Returns the statistics for the partition.
+   * Returns the statistics applicable to the partition.
    *
-   * @return a map of statistics applicable to the partition, where the key is 
the statistic name
+   * @return a list of {@link PersistedStatistic} objects, each representing a 
statistic for the
+   *     partition
    */
-  public Map<String, StatisticValue<?>> statistics() {
+  public List<PersistedStatistic> statistics() {
     return statistics;
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java
new file mode 100644
index 0000000000..2c87dc3bcd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/PersistedStatistic.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.stats.StatisticValue;
+
+/** Represents a persisted statistic with its name, value, and associated 
audit information. */
+public class PersistedStatistic {
+
+  private final String name;
+  private final StatisticValue<?> value;
+  private final AuditInfo auditInfo;
+
+  /**
+   * Creates an instance of {@link PersistedStatistic}.
+   *
+   * @param name the name of the statistic
+   * @param value the value of the statistic
+   * @param auditInfo the audit information associated with the statistic
+   * @return a new instance of {@link PersistedStatistic}
+   */
+  public static PersistedStatistic of(String name, StatisticValue<?> value, 
AuditInfo auditInfo) {
+    return new PersistedStatistic(name, value, auditInfo);
+  }
+
+  /**
+   * Private constructor for {@link PersistedStatistic}.
+   *
+   * @param name the name of the statistic
+   * @param value the value of the statistic
+   * @param auditInfo the audit information associated with the statistic
+   */
+  private PersistedStatistic(String name, StatisticValue<?> value, AuditInfo 
auditInfo) {
+    this.name = name;
+    this.value = value;
+    this.auditInfo = auditInfo;
+  }
+
+  /**
+   * Returns the name of the statistic.
+   *
+   * @return the name of the statistic
+   */
+  public String name() {
+    return name;
+  }
+
+  /**
+   * Returns the value of the statistic.
+   *
+   * @return the value of the statistic
+   */
+  public StatisticValue<?> value() {
+    return value;
+  }
+
+  /**
+   * Returns the audit information associated with the statistic.
+   *
+   * @return the audit information
+   */
+  public AuditInfo auditInfo() {
+    return auditInfo;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java 
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
index e430d9486c..03cb28245d 100644
--- a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -63,6 +63,7 @@ import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.junit.jupiter.api.AfterAll;
@@ -115,6 +116,8 @@ public class TestStatisticManager {
     Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
     Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
     
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+    Mockito.when(config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS))
+        
.thenReturn(MemoryPartitionStatsStorageFactory.class.getCanonicalName());
 
     Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
     Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
@@ -193,7 +196,7 @@ public class TestStatisticManager {
 
   @Test
   public void testStatisticLifeCycle() {
-    StatisticManager statisticManager = new StatisticManager(entityStore, 
idGenerator);
+    StatisticManager statisticManager = new StatisticManager(entityStore, 
idGenerator, config);
 
     MetadataObject tableObject =
         MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE), 
MetadataObject.Type.TABLE);
@@ -284,4 +287,107 @@ public class TestStatisticManager {
         NoSuchMetadataObjectException.class,
         () -> statisticManager.dropStatistics(METALAKE, notExistObject, 
statNames));
   }
+
+  @Test
+  public void testPartitionStatisticLifeCycle() {
+    StatisticManager statisticManager = new StatisticManager(entityStore, 
idGenerator, config);
+
+    MetadataObject tableObject =
+        MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE), 
MetadataObject.Type.TABLE);
+    Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+    // Update statistics
+    stats.put("a", StatisticValues.stringValue("1"));
+    stats.put("b", StatisticValues.longValue(1L));
+    stats.put("c", StatisticValues.doubleValue(1.0));
+    stats.put("d", StatisticValues.booleanValue(true));
+    stats.put(
+        "e",
+        StatisticValues.listValue(
+            Lists.newArrayList(
+                StatisticValues.stringValue("1"), 
StatisticValues.stringValue("2"))));
+
+    Map<String, StatisticValue<?>> map = Maps.newHashMap();
+    map.put("x", StatisticValues.stringValue("1"));
+    map.put("y", StatisticValues.longValue(2L));
+    stats.put("f", StatisticValues.objectValue(map));
+    List<PartitionStatisticsUpdate> partitionStatistics = Lists.newArrayList();
+    partitionStatistics.add(PartitionStatisticsModification.update("p0", 
stats));
+    statisticManager.updatePartitionStatistics(METALAKE, tableObject, 
partitionStatistics);
+
+    List<PartitionStatistics> statistics =
+        statisticManager.listPartitionStatistics(
+            METALAKE,
+            tableObject,
+            PartitionRange.between(
+                "p0", PartitionRange.BoundType.CLOSED, "p1", 
PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, statistics.size());
+    PartitionStatistics listedPartitionStats = statistics.get(0);
+    Assertions.assertEquals(6, listedPartitionStats.statistics().length);
+    for (Statistic statistic : listedPartitionStats.statistics()) {
+      Assertions.assertTrue(
+          stats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = stats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+
+    // Update partial statistics
+    Map<String, StatisticValue<?>> expectStats = Maps.newHashMap();
+    expectStats.putAll(stats);
+    stats.clear();
+    stats.put("f", StatisticValues.longValue(2L));
+    stats.put("x", StatisticValues.longValue(2L));
+    partitionStatistics.clear();
+    partitionStatistics.add(PartitionStatisticsModification.update("p0", 
stats));
+
+    expectStats.put("f", StatisticValues.longValue(2L));
+    expectStats.put("x", StatisticValues.longValue(2));
+
+    statisticManager.updatePartitionStatistics(METALAKE, tableObject, 
partitionStatistics);
+    statistics =
+        statisticManager.listPartitionStatistics(
+            METALAKE,
+            tableObject,
+            PartitionRange.between(
+                "p0", PartitionRange.BoundType.CLOSED, "p1", 
PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, statistics.size());
+    listedPartitionStats = statistics.get(0);
+    Assertions.assertEquals(7, listedPartitionStats.statistics().length);
+    for (Statistic statistic : listedPartitionStats.statistics()) {
+      Assertions.assertTrue(
+          expectStats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = expectStats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+
+    // Drop statistics
+    expectStats.remove("a");
+    expectStats.remove("b");
+    expectStats.remove("c");
+    List<String> statNames = Lists.newArrayList("a", "b", "c");
+    List<PartitionStatisticsDrop> partitionStatisticsToDrop = 
Lists.newArrayList();
+    partitionStatisticsToDrop.add(PartitionStatisticsModification.drop("p0", 
statNames));
+    statisticManager.dropPartitionStatistics(METALAKE, tableObject, 
partitionStatisticsToDrop);
+    statistics =
+        statisticManager.listPartitionStatistics(
+            METALAKE,
+            tableObject,
+            PartitionRange.between(
+                "p0", PartitionRange.BoundType.CLOSED, "p1", 
PartitionRange.BoundType.OPEN));
+    Assertions.assertEquals(1, statistics.size());
+    listedPartitionStats = statistics.get(0);
+    Assertions.assertEquals(4, listedPartitionStats.statistics().length);
+
+    for (Statistic statistic : listedPartitionStats.statistics()) {
+      Assertions.assertTrue(
+          expectStats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = expectStats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
index 4e60ff4dec..980b7f22ca 100644
--- 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestMemoryPartitionStatsStorage.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.stats.PartitionRange;
 import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
 import org.apache.gravitino.stats.PartitionStatisticsUpdate;
 import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.stats.StatisticValues;
@@ -52,25 +53,86 @@ public class TestMemoryPartitionStatsStorage {
 
       Map<String, StatisticValue<?>> statistics = Maps.newHashMap();
       statistics.put("k1", StatisticValues.stringValue("v1"));
-      PartitionStatisticsUpdate update = PartitionStatisticsUpdate.of("p0", 
statistics);
+      PartitionStatisticsUpdate updateP0 = 
PartitionStatisticsModification.update("p0", statistics);
+      PartitionStatisticsUpdate updateP1 = 
PartitionStatisticsModification.update("p1", statistics);
       MetadataObjectStatisticsUpdate metadataObjectStatisticsUpdate =
-          MetadataObjectStatisticsUpdate.of(metadataObject, 
Lists.newArrayList(update));
+          MetadataObjectStatisticsUpdate.of(metadataObject, 
Lists.newArrayList(updateP0, updateP1));
       storage.updateStatistics("metalake", 
Lists.newArrayList(metadataObjectStatisticsUpdate));
 
+      // case 1: closed lower bound
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.downTo("p0", PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(2, stats.size());
+
+      // case 2: open upper bound
+      stats =
+          storage.listStatistics(
+              "metalake", metadataObject, PartitionRange.upTo("p0", 
PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(0, stats.size());
+
+      // case 3: open lower bound
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.downTo("p0", PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(1, stats.size());
+
+      // case 4: closed upper bound
       stats =
           storage.listStatistics(
               "metalake",
               metadataObject,
               PartitionRange.upTo("p0", PartitionRange.BoundType.CLOSED));
       Assertions.assertEquals(1, stats.size());
-      Assertions.assertEquals(stats.get(0).partitionName(), "p0");
-      Map<String, StatisticValue<?>> partitionStats = 
stats.get(0).statistics();
+      List<PersistedStatistic> partitionStats = stats.get(0).statistics();
+
       Assertions.assertEquals(1, partitionStats.size());
-      Assertions.assertTrue(partitionStats.containsKey("k1"));
-      StatisticValue<?> value = partitionStats.get("k1");
+      Assertions.assertEquals(partitionStats.get(0).name(), "k1");
+      StatisticValue<?> value = partitionStats.get(0).value();
       Assertions.assertEquals(StatisticValues.stringValue("v1"), value);
 
-      PartitionStatisticsDrop drop = PartitionStatisticsDrop.of("p0", 
Lists.newArrayList("k1"));
+      // case 5: between p0 and p1 with [closed, open)
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.between(
+                  "p0", PartitionRange.BoundType.CLOSED, "p1", 
PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(1, stats.size());
+
+      // case 6: between p0 and p1 with (open, closed]
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.between(
+                  "p0", PartitionRange.BoundType.OPEN, "p1", 
PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(1, stats.size());
+
+      // case 7: between p0 and p1 with (open, open)
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.between(
+                  "p0", PartitionRange.BoundType.OPEN, "p1", 
PartitionRange.BoundType.OPEN));
+      Assertions.assertEquals(0, stats.size());
+
+      // case 8: between p0 and p1 with [closed, closed]
+      stats =
+          storage.listStatistics(
+              "metalake",
+              metadataObject,
+              PartitionRange.between(
+                  "p0", PartitionRange.BoundType.CLOSED, "p1", 
PartitionRange.BoundType.CLOSED));
+      Assertions.assertEquals(2, stats.size());
+
+      PartitionStatisticsDrop drop =
+          PartitionStatisticsModification.drop("p0", Lists.newArrayList("k1"));
       List<PartitionStatisticsDrop> drops = Lists.newArrayList(drop);
 
       List<MetadataObjectStatisticsDrop> partitionStatisticsToDrop =
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 7b5ed17336..a9dc5ff630 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -193,6 +193,11 @@ public class ExceptionHandlers {
     return StatisticExceptionHandler.INSTANCE.handle(type, name, parent, e);
   }
 
+  public static Response handlePartitionStatsException(
+      OperationType type, String name, String parent, Exception e) {
+    return PartitionStatsExceptionHandler.INSTANCE.handle(type, name, parent, 
e);
+  }
+
   private static class PartitionExceptionHandler extends BaseExceptionHandler {
 
     private static final ExceptionHandler INSTANCE = new 
PartitionExceptionHandler();
@@ -940,6 +945,35 @@ public class ExceptionHandlers {
     }
   }
 
+  private static class PartitionStatsExceptionHandler extends 
BaseExceptionHandler {
+
+    private static final ExceptionHandler INSTANCE = new 
PartitionStatsExceptionHandler();
+
+    private static String getPartitionStatsErrorMsg(
+        String partition, String operation, String table, String reason) {
+      return String.format(
+          "Failed to operate partition stats%s operation [%s] of table [%s], 
reason [%s]",
+          partition, operation, table, reason);
+    }
+
+    @Override
+    public Response handle(OperationType op, String partition, String table, 
Exception e) {
+      String formatted = StringUtil.isBlank(partition) ? "" : " [" + partition 
+ "]";
+      String errorMsg = getPartitionStatsErrorMsg(formatted, op.name(), table, 
getErrorMsg(e));
+      LOG.warn(errorMsg, e);
+
+      if (e instanceof IllegalArgumentException) {
+        return Utils.illegalArguments(errorMsg, e);
+
+      } else if (e instanceof NotFoundException) {
+        return Utils.notFound(errorMsg, e);
+
+      } else {
+        return super.handle(op, partition, table, e);
+      }
+    }
+  }
+
   @VisibleForTesting
   static class BaseExceptionHandler extends ExceptionHandler {
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
index 2ddcfc2e50..a7ef188bfb 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/StatisticOperations.java
@@ -20,35 +20,48 @@ package org.apache.gravitino.server.web.rest;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
 import org.apache.gravitino.dto.requests.StatisticsDropRequest;
 import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
 import org.apache.gravitino.dto.responses.BaseResponse;
 import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
 import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.IllegalStatisticNameException;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
 import org.apache.gravitino.stats.Statistic;
 import org.apache.gravitino.stats.StatisticManager;
 import org.apache.gravitino.stats.StatisticValue;
@@ -200,4 +213,247 @@ public class StatisticOperations {
           OperationType.DROP, StringUtils.join(request.getNames(), ","), 
fullName, e);
     }
   }
+
+  @GET
+  @Path("/partitions")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "list-partition-stats." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "list-partition-stats", absolute = true)
+  public Response listPartitionStatistics(
+      @PathParam("metalake") String metalake,
+      @PathParam("type") String type,
+      @PathParam("fullName") String fullName,
+      @QueryParam("from") String fromPartitionName,
+      @QueryParam("to") String toPartitionName,
+      @QueryParam("fromInclusive") @DefaultValue("true") boolean fromInclusive,
+      @QueryParam("toInclusive") @DefaultValue("false") boolean toInclusive) {
+
+    String formattedFromPartitionName =
+        getFormattedFromPartitionName(fromPartitionName, fromInclusive);
+    String formattedToPartitionName = 
getFormattedToPartitionName(toPartitionName, toInclusive);
+
+    LOG.info(
+        "Listing partition statistics for table: {} in the metalake {} from {} 
to {}",
+        fullName,
+        metalake,
+        formattedFromPartitionName,
+        formattedToPartitionName);
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            MetadataObject object =
+                MetadataObjects.parse(
+                    fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+            if (object.type() != MetadataObject.Type.TABLE) {
+              throw new UnsupportedOperationException(
+                  "Listing partition statistics is only supported for tables 
now.");
+            }
+
+            if (fromPartitionName == null && toPartitionName == null) {
+              throw new IllegalArgumentException(
+                  "Both 'from' and 'to' parameters cannot be null at the same 
time.");
+            }
+
+            MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+            PartitionRange range;
+            PartitionRange.BoundType fromBoundType = 
getFromBoundType(fromInclusive);
+            PartitionRange.BoundType toBoundType = 
getFromBoundType(toInclusive);
+            if (fromPartitionName != null && toPartitionName != null) {
+              range =
+                  PartitionRange.between(
+                      fromPartitionName, fromBoundType, toPartitionName, 
toBoundType);
+            } else if (fromPartitionName != null) {
+              range = PartitionRange.downTo(fromPartitionName, fromBoundType);
+            } else {
+              range = PartitionRange.upTo(toPartitionName, toBoundType);
+            }
+
+            List<PartitionStatistics> statistics =
+                statisticManager.listPartitionStatistics(metalake, object, 
range);
+
+            PartitionStatisticsDTO[] partitionStatistics =
+                statistics.stream()
+                    .map(
+                        partitionStatistic ->
+                            PartitionStatisticsDTO.of(
+                                partitionStatistic.partitionName(),
+                                
DTOConverters.toDTOs(partitionStatistic.statistics())))
+                    .toArray(PartitionStatisticsDTO[]::new);
+
+            return Utils.ok(new 
PartitionStatisticsListResponse(partitionStatistics));
+          });
+    } catch (Exception e) {
+      LOG.error(
+          "Error listing {},{} partition statistics for table: {} in the 
metalake {}.",
+          formattedFromPartitionName,
+          formattedToPartitionName,
+          fullName,
+          metalake,
+          e);
+      return ExceptionHandlers.handlePartitionStatsException(
+          OperationType.LIST,
+          formattedFromPartitionName + "," + formattedToPartitionName,
+          fullName,
+          e);
+    }
+  }
+
+  @PUT
+  @Path("/partitions")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "update-partitions-stats." + 
MetricNames.HTTP_PROCESS_DURATION, absolute = true)
+  @ResponseMetered(name = "update-partitions-stats", absolute = true)
+  public Response updatePartitionStatistics(
+      @PathParam("metalake") String metalake,
+      @PathParam("type") String type,
+      @PathParam("fullName") String fullName,
+      PartitionStatisticsUpdateRequest request) {
+    LOG.info("Updating partition statistics for table: {} in the metalake {}", 
fullName, metalake);
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            request.validate();
+
+            MetadataObject object =
+                MetadataObjects.parse(
+                    fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+            if (object.type() != MetadataObject.Type.TABLE) {
+              throw new UnsupportedOperationException(
+                  "Updating partition statistics is only supported for tables 
now.");
+            }
+
+            List<PartitionStatisticsUpdateDTO> updates = request.getUpdates();
+            for (PartitionStatisticsUpdateDTO update : updates) {
+              update
+                  .statistics()
+                  .keySet()
+                  .forEach(
+                      statistic -> {
+                        if (!statistic.startsWith(Statistic.CUSTOM_PREFIX)) {
+                          // Current we only support custom statistics
+                          throw new IllegalStatisticNameException(
+                              "Statistic name must start with %s, but got: %s",
+                              Statistic.CUSTOM_PREFIX, statistic);
+                        }
+                      });
+            }
+
+            MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+            statisticManager.updatePartitionStatistics(
+                metalake,
+                object,
+                updates.stream()
+                    .map(
+                        update ->
+                            PartitionStatisticsModification.update(
+                                update.partitionName(), update.statistics()))
+                    .collect(Collectors.toList()));
+
+            return Utils.ok(new BaseResponse(0));
+          });
+    } catch (Exception e) {
+      LOG.error(
+          "Error updating partition statistics for table: {} in the metalake 
{}",
+          fullName,
+          metalake,
+          e);
+      String partitions =
+          StringUtils.joinWith(
+              ",",
+              request.getUpdates().stream()
+                  .map(PartitionStatisticsUpdateDTO::partitionName)
+                  .collect(Collectors.toList()));
+      return ExceptionHandlers.handlePartitionStatsException(
+          OperationType.UPDATE, partitions, fullName, e);
+    }
+  }
+
+  @POST
+  @Path("/partitions")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "drop-partitions-stats." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "drop-partitions-stats", absolute = true)
+  public Response dropPartitionStatistics(
+      @PathParam("metalake") String metalake,
+      @PathParam("type") String type,
+      @PathParam("fullName") String fullName,
+      PartitionStatisticsDropRequest request) {
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            request.validate();
+            MetadataObject object =
+                MetadataObjects.parse(
+                    fullName, 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT)));
+            if (object.type() != MetadataObject.Type.TABLE) {
+              throw new UnsupportedOperationException(
+                  "Dropping partition statistics is only supported for tables 
now.");
+            }
+
+            MetadataObjectUtil.checkMetadataObject(metalake, object);
+
+            return Utils.ok(
+                new DropResponse(
+                    statisticManager.dropPartitionStatistics(
+                        metalake,
+                        object,
+                        request.getDrops().stream()
+                            .map(
+                                drop ->
+                                    PartitionStatisticsModification.drop(
+                                        drop.partitionName(), 
drop.statisticNames()))
+                            .collect(Collectors.toList()))));
+          });
+    } catch (Exception e) {
+      LOG.error(
+          "Error dropping partition statistics for table: {} in the metalake 
{}.",
+          fullName,
+          metalake,
+          e);
+      String partitions =
+          StringUtils.joinWith(
+              ",",
+              request.getDrops().stream()
+                  .map(PartitionStatisticsDropDTO::partitionName)
+                  .collect(Collectors.toList()));
+      return ExceptionHandlers.handlePartitionStatsException(
+          OperationType.DROP, partitions, fullName, e);
+    }
+  }
+
+  @VisibleForTesting
+  static PartitionRange.BoundType getFromBoundType(boolean inclusive) {
+    return inclusive ? PartitionRange.BoundType.CLOSED : 
PartitionRange.BoundType.OPEN;
+  }
+
+  private static String getFormattedFromPartitionName(
+      String fromPartitionName, boolean fromInclusive) {
+    if (fromPartitionName == null) {
+      return "(-INF";
+    } else {
+      if (fromInclusive) {
+        return "[" + fromPartitionName;
+      } else {
+        return "(" + fromPartitionName;
+      }
+    }
+  }
+
+  private static String getFormattedToPartitionName(String toPartitionName, 
boolean toInclusive) {
+    if (toPartitionName == null) {
+      return "INF)";
+    } else {
+      if (toInclusive) {
+        return toPartitionName + "]";
+      } else {
+        return toPartitionName + ")";
+      }
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
index 1a55b10282..3a4c20e68f 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
@@ -31,7 +31,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
@@ -42,19 +44,27 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.catalog.TableDispatcher;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
 import org.apache.gravitino.dto.requests.StatisticsDropRequest;
 import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
 import org.apache.gravitino.dto.responses.BaseResponse;
 import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.ErrorConstants;
 import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
 import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
 import org.apache.gravitino.dto.stats.StatisticDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
 import org.apache.gravitino.stats.Statistic;
 import org.apache.gravitino.stats.StatisticManager;
 import org.apache.gravitino.stats.StatisticValue;
@@ -321,13 +331,13 @@ public class TestStatisticOperations extends JerseyTest {
             .accept("application/vnd.gravitino.v1+json")
             .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
 
-    Assertions.assertEquals(
-        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp3.getStatus());
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp3.getStatus());
     Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp3.getMediaType());
 
     ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
-    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp3.getCode());
-    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp3.getType());
+    Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(
+        IllegalStatisticNameException.class.getSimpleName(), 
errorResp3.getType());
   }
 
   @Test
@@ -406,4 +416,326 @@ public class TestStatisticOperations extends JerseyTest {
     Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
     Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
   }
+
+  @Test
+  public void testListPartitionStatistics() {
+    AuditInfo auditInfo =
+        AuditInfo.builder()
+            .withCreateTime(Instant.now())
+            .withCreator("test")
+            .withLastModifiedTime(Instant.now())
+            .withLastModifier("test")
+            .build();
+
+    StatisticDTO stat1 =
+        StatisticDTO.builder()
+            .withName("test1")
+            .withValue(Optional.of(StatisticValues.stringValue("test")))
+            .withReserved(true)
+            .withModifiable(false)
+            .withAudit(DTOConverters.toDTO(auditInfo))
+            .build();
+    StatisticDTO stat2 =
+        StatisticDTO.builder()
+            .withName("test1")
+            .withValue(Optional.of(StatisticValues.longValue(1L)))
+            .withReserved(true)
+            .withModifiable(false)
+            .withAudit(DTOConverters.toDTO(auditInfo))
+            .build();
+    PartitionStatistics partitionStatistics =
+        new StatisticManager.CustomPartitionStatistic("partition1", new 
Statistic[] {stat1, stat2});
+    MetadataObject tableObject =
+        MetadataObjects.parse(
+            String.format("%s.%s.%s", catalog, schema, table), 
MetadataObject.Type.TABLE);
+
+    when(manager.listPartitionStatistics(any(), any(), any()))
+        .thenReturn(Lists.newArrayList(partitionStatistics));
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+    Response resp =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .queryParam("from", "p0")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    PartitionStatisticsListResponse listResp =
+        resp.readEntity(PartitionStatisticsListResponse.class);
+    Assertions.assertEquals(0, listResp.getCode());
+
+    Statistic[] statisticDTOS = 
listResp.getPartitionStatistics()[0].statistics();
+    Assertions.assertEquals(2, statisticDTOS.length);
+    Assertions.assertEquals(stat1.name(), statisticDTOS[0].name());
+    Assertions.assertEquals(stat1.value().get(), 
statisticDTOS[0].value().get());
+    Assertions.assertEquals(stat2.name(), statisticDTOS[1].name());
+    Assertions.assertEquals(stat2.value().get(), 
statisticDTOS[1].value().get());
+
+    // Test throw NoSuchMetadataObjectException
+    when(tableDispatcher.tableExists(any())).thenReturn(false);
+    Response resp1 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .queryParam("from", "p0")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp1.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp1.getMediaType());
+
+    ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(
+        NoSuchMetadataObjectException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw RuntimeException
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+    doThrow(new RuntimeException("mock error"))
+        .when(manager)
+        .listPartitionStatistics(any(), any(), any());
+    Response resp2 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .queryParam("from", "p0")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp2.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp2.getMediaType());
+
+    ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
+  }
+
+  @Test
+  public void testUpdatePartitionStatistics() {
+    Map<String, StatisticValue<?>> statsMap = Maps.newHashMap();
+    statsMap.put(Statistic.CUSTOM_PREFIX + "test1", 
StatisticValues.stringValue("test"));
+    statsMap.put(Statistic.CUSTOM_PREFIX + "test2", 
StatisticValues.longValue(1L));
+    List<PartitionStatisticsUpdateDTO> partitionStatsList = 
Lists.newArrayList();
+    partitionStatsList.add(PartitionStatisticsUpdateDTO.of("partition1", 
statsMap));
+    MetadataObject tableObject =
+        MetadataObjects.parse(
+            String.format("%s.%s.%s", catalog, schema, table), 
MetadataObject.Type.TABLE);
+
+    PartitionStatisticsUpdateRequest req = new 
PartitionStatisticsUpdateRequest(partitionStatsList);
+
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+    Response resp =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    BaseResponse updateResp = resp.readEntity(BaseResponse.class);
+    Assertions.assertEquals(0, updateResp.getCode());
+
+    // Test throw NoSuchMetadataObjectException
+    when(tableDispatcher.tableExists(any())).thenReturn(false);
+
+    Response resp1 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp1.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp1.getMediaType());
+
+    ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(
+        NoSuchMetadataObjectException.class.getSimpleName(), 
errorResp.getType());
+
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock error"))
+        .when(manager)
+        .updatePartitionStatistics(any(), any(), any());
+    Response resp2 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp2.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp2.getMediaType());
+
+    ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw IllegalStatisticNameException
+    statsMap.put("test1", StatisticValues.longValue(1L));
+
+    req = new PartitionStatisticsUpdateRequest(partitionStatsList);
+    Response resp3 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp3.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp3.getMediaType());
+
+    ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(
+        IllegalStatisticNameException.class.getSimpleName(), 
errorResp3.getType());
+  }
+
+  @Test
+  public void testDropPartitionStatistics() {
+    List<PartitionStatisticsDropDTO> partitionStatistics = 
Lists.newArrayList();
+    partitionStatistics.add(
+        PartitionStatisticsDropDTO.of("partition1", 
Lists.newArrayList("stat1", "stat2")));
+    PartitionStatisticsDropRequest req = new 
PartitionStatisticsDropRequest(partitionStatistics);
+    when(manager.dropPartitionStatistics(any(), any(), 
any())).thenReturn(true);
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+
+    MetadataObject tableObject =
+        MetadataObjects.parse(
+            String.format("%s.%s.%s", catalog, schema, table), 
MetadataObject.Type.TABLE);
+
+    Response resp =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    DropResponse dropResp = resp.readEntity(DropResponse.class);
+    Assertions.assertEquals(0, dropResp.getCode());
+    Assertions.assertTrue(dropResp.dropped());
+
+    // Test throw NoSuchMetadataObjectExcep
+    when(tableDispatcher.tableExists(any())).thenReturn(false);
+
+    Response resp1 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp1.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp1.getMediaType());
+
+    ErrorResponse errorResp = resp1.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(
+        NoSuchMetadataObjectException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw RuntimeException
+    when(tableDispatcher.tableExists(any())).thenReturn(true);
+    doThrow(new RuntimeException("mock error"))
+        .when(manager)
+        .dropPartitionStatistics(any(), any(), any());
+    Response resp2 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp2.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp2.getMediaType());
+
+    ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
+  }
+
+  @Test
+  public void testGetBoundType() {
+    Assertions.assertEquals(
+        PartitionRange.BoundType.CLOSED, 
StatisticOperations.getFromBoundType(true));
+    Assertions.assertEquals(
+        PartitionRange.BoundType.OPEN, 
StatisticOperations.getFromBoundType(false));
+  }
 }

Reply via email to