This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c8807c1c3 [#7274] feat(client-java): Client supports statistics and 
partition statistics (#7828)
6c8807c1c3 is described below

commit 6c8807c1c3d6745d512339065ee93ff943d2ef2c
Author: Tianhang <[email protected]>
AuthorDate: Thu Aug 21 16:02:46 2025 +0800

    [#7274] feat(client-java): Client supports statistics and partition 
statistics (#7828)
    
    ### What changes were proposed in this pull request?
    
    1. Renamed the partition statistics interface methods, because it will
    conflict with statistics interface methods
    2. Add the client supports for statistics
    
    ### Why are the changes needed?
    
    Fix: #7274
    
    ### Does this PR introduce any user-facing change?
    
      Yes, I will add the document later.
    
    ### How was this patch tested?
    
      Added UT and IT.
    
    ---------
    
    Co-authored-by: teo <[email protected]>
    Co-authored-by: Rory <[email protected]>
---
 .../exceptions/UnmodifiableStatisticException.java |   6 +-
 .../main/java/org/apache/gravitino/rel/Table.java  |  21 ++
 .../stats/SupportsPartitionStatistics.java         |   6 +-
 .../org/apache/gravitino/client/ErrorHandlers.java |  77 ++++
 ...etadataObjectPartitionStatisticsOperations.java | 145 ++++++++
 .../client/MetadataObjectStatisticsOperations.java | 111 ++++++
 .../apache/gravitino/client/RelationalTable.java   |  69 +++-
 .../client/TestSupportsPartitionStatistics.java    | 289 +++++++++++++++
 .../gravitino/client/TestSupportsStatistics.java   | 261 ++++++++++++++
 .../client/integration/test/StatisticIT.java       | 389 +++++++++++++++++++++
 .../gravitino/dto/responses/ErrorResponse.java     |   4 +-
 .../postgresql/StatisticPostgresSQLProvider.java   |   2 +-
 scripts/postgresql/schema-1.0.0-postgresql.sql     |   2 +-
 .../upgrade-0.9.0-to-1.0.0-postgresql.sql          |   2 +-
 .../server/web/rest/ExceptionHandlers.java         |   6 +
 .../server/web/rest/TestStatisticOperations.java   | 109 ++++++
 16 files changed, 1488 insertions(+), 11 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/UnmodifiableStatisticException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/UnmodifiableStatisticException.java
index 1130669b1a..a907bece88 100644
--- 
a/api/src/main/java/org/apache/gravitino/exceptions/UnmodifiableStatisticException.java
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/UnmodifiableStatisticException.java
@@ -22,7 +22,7 @@ import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 
 /** An exception thrown when users modify an unmodifiable statistic */
-public class UnmodifiableStatisticException extends GravitinoRuntimeException {
+public class UnmodifiableStatisticException extends 
UnsupportedOperationException {
   /**
    * Constructs a new exception with the specified detail message.
    *
@@ -31,7 +31,7 @@ public class UnmodifiableStatisticException extends 
GravitinoRuntimeException {
    */
   @FormatMethod
   public UnmodifiableStatisticException(@FormatString String message, 
Object... args) {
-    super(message, args);
+    super(String.format(message, args));
   }
 
   /**
@@ -44,6 +44,6 @@ public class UnmodifiableStatisticException extends 
GravitinoRuntimeException {
   @FormatMethod
   public UnmodifiableStatisticException(
       Throwable cause, @FormatString String message, Object... args) {
-    super(cause, message, args);
+    super(String.format(message, args), cause);
   }
 }
diff --git a/api/src/main/java/org/apache/gravitino/rel/Table.java 
b/api/src/main/java/org/apache/gravitino/rel/Table.java
index fcb9a8a3b4..c5b4d06084 100644
--- a/api/src/main/java/org/apache/gravitino/rel/Table.java
+++ b/api/src/main/java/org/apache/gravitino/rel/Table.java
@@ -33,6 +33,8 @@ import 
org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.stats.SupportsPartitionStatistics;
+import org.apache.gravitino.stats.SupportsStatistics;
 import org.apache.gravitino.tag.SupportsTags;
 
 /**
@@ -121,4 +123,23 @@ public interface Table extends Auditable {
   default SupportsRoles supportsRoles() {
     throw new UnsupportedOperationException("Table does not support role 
operations.");
   }
+
+  /**
+   * Returns the {@link SupportsStatistics} if the table supports statistics 
operations.
+   *
+   * @return The {@link SupportsStatistics} for the table.
+   */
+  default SupportsStatistics supportsStatistics() {
+    throw new UnsupportedOperationException("Table does not support statistics 
operations.");
+  }
+
+  /**
+   * Returns the {@link SupportsPartitionStatistics} if the table supports 
partition statistics
+   *
+   * @return The {@link SupportsPartitionStatistics} for the table.
+   */
+  default SupportsPartitionStatistics supportsPartitionStatistics() {
+    throw new UnsupportedOperationException(
+        "Table does not support partition statistics operations.");
+  }
 }
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
index 8d25aba2c2..9e17bea67b 100644
--- 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
+++ 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
@@ -33,7 +33,7 @@ public interface SupportsPartitionStatistics {
    * @return a list of PartitionStatistics, where each PartitionStatistics 
contains the partition
    *     name and a list of statistics applicable to that partition.
    */
-  List<PartitionStatistics> listStatistics(PartitionRange range);
+  List<PartitionStatistics> listPartitionStatistics(PartitionRange range);
 
   /**
    * Updates statistics with the provided values. If the statistic exists, it 
will be updated with
@@ -45,7 +45,7 @@ public interface SupportsPartitionStatistics {
    *     values to be updated.
    * @throws UnmodifiableStatisticException if any of the statistics to be 
updated are unmodifiable
    */
-  void updateStatistics(List<PartitionStatisticsUpdate> statisticsToUpdate)
+  void updatePartitionStatistics(List<PartitionStatisticsUpdate> 
statisticsToUpdate)
       throws UnmodifiableStatisticException;
 
   /**
@@ -56,6 +56,6 @@ public interface SupportsPartitionStatistics {
    * @return true if the statistics were successfully dropped.
    * @throws UnmodifiableStatisticException if any of the statistics to be 
dropped are unmodifiable
    */
-  boolean dropStatistics(List<PartitionStatisticsDrop> statisticsToDrop)
+  boolean dropPartitionStatistics(List<PartitionStatisticsDrop> 
statisticsToDrop)
       throws UnmodifiableStatisticException;
 }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 9cf57cd15f..11e41aacba 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -37,6 +37,7 @@ import 
org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.IllegalRoleException;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
@@ -78,6 +79,7 @@ import 
org.apache.gravitino.exceptions.TagAlreadyAssociatedException;
 import org.apache.gravitino.exceptions.TagAlreadyExistsException;
 import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
 import org.apache.gravitino.exceptions.UnauthorizedException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
 import org.apache.gravitino.exceptions.UserAlreadyExistsException;
 
 /**
@@ -257,6 +259,15 @@ public class ErrorHandlers {
     return JobErrorHandler.INSTANCE;
   }
 
+  /**
+   * Creates an error handler specific to Statistics operations.
+   *
+   * @return A Consumer representing the Statistics error handler.
+   */
+  public static Consumer<ErrorResponse> statisticsErrorHandler() {
+    return StatisticsErrorHandler.INSTANCE;
+  }
+
   private ErrorHandlers() {}
 
   /**
@@ -1190,6 +1201,72 @@ public class ErrorHandlers {
     }
   }
 
+  /** Error handler specific to Statistics operations. */
+  @SuppressWarnings("FormatStringAnnotation")
+  private static class StatisticsErrorHandler extends RestErrorHandler {
+
+    private static final StatisticsErrorHandler INSTANCE = new 
StatisticsErrorHandler();
+
+    @Override
+    public void accept(ErrorResponse errorResponse) {
+      String errorMessage = formatErrorMessage(errorResponse);
+
+      switch (errorResponse.getCode()) {
+        case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+          if 
(errorResponse.getType().equals(IllegalStatisticNameException.class.getSimpleName()))
 {
+            throw new IllegalStatisticNameException(errorMessage);
+          } else {
+            throw new IllegalArgumentException(errorMessage);
+          }
+
+        case ErrorConstants.NOT_FOUND_CODE:
+          if 
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
+            throw new NoSuchSchemaException(errorMessage);
+          } else if 
(errorResponse.getType().equals(NoSuchTableException.class.getSimpleName())) {
+            throw new NoSuchTableException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchMetadataObjectException.class.getSimpleName())) {
+            throw new NoSuchMetadataObjectException(errorMessage);
+          } else {
+            throw new NotFoundException(errorMessage);
+          }
+
+        case ErrorConstants.UNSUPPORTED_OPERATION_CODE:
+          if (errorResponse
+              .getType()
+              .equals(UnmodifiableStatisticException.class.getSimpleName())) {
+            throw new UnmodifiableStatisticException(errorMessage);
+          } else {
+            throw new UnsupportedOperationException(errorMessage);
+          }
+
+        case ErrorConstants.FORBIDDEN_CODE:
+          throw new ForbiddenException(errorMessage);
+
+        case ErrorConstants.INTERNAL_ERROR_CODE:
+          throw new RuntimeException(errorMessage);
+
+        case ErrorConstants.NOT_IN_USE_CODE:
+          if 
(errorResponse.getType().equals(CatalogNotInUseException.class.getSimpleName()))
 {
+            throw new CatalogNotInUseException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(MetalakeNotInUseException.class.getSimpleName())) {
+            throw new MetalakeNotInUseException(errorMessage);
+          } else {
+            throw new NotInUseException(errorMessage);
+          }
+
+        case ErrorConstants.IN_USE_CODE:
+          throw new InUseException(errorMessage);
+
+        default:
+          super.accept(errorResponse);
+      }
+    }
+  }
+
   /** Generic error handler for REST requests. */
   private static class RestErrorHandler extends ErrorHandler {
     private static final ErrorHandler INSTANCE = new RestErrorHandler();
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectPartitionStatisticsOperations.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectPartitionStatisticsOperations.java
new file mode 100644
index 0000000000..b1d9080fe5
--- /dev/null
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectPartitionStatisticsOperations.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.SupportsPartitionStatistics;
+
+/**
+ * The implementation of {@link SupportsPartitionStatistics}. This interface 
will be composited into
+ * table to provide partition statistics operations for metadata objects.
+ */
+class MetadataObjectPartitionStatisticsOperations implements 
SupportsPartitionStatistics {
+
+  private static final String FROM = "from";
+  private static final String FROM_INCLUSIVE = "fromInclusive";
+  private static final String TO = "to";
+  private static final String TO_INCLUSIVE = "toInclusive";
+  private final RESTClient restClient;
+  private final String statisticsRequestPath;
+
+  MetadataObjectPartitionStatisticsOperations(
+      String metalakeName, MetadataObject metadataObject, RESTClient 
restClient) {
+    this.restClient = restClient;
+    this.statisticsRequestPath =
+        String.format(
+            "api/metalakes/%s/objects/%s/%s/statistics/partitions",
+            RESTUtils.encodeString(metalakeName),
+            metadataObject.type().name().toLowerCase(Locale.ROOT),
+            RESTUtils.encodeString(metadataObject.fullName()));
+  }
+
+  @Override
+  public List<PartitionStatistics> listPartitionStatistics(PartitionRange 
range) {
+    Map<String, String> queryParams = Maps.newHashMap();
+    range.lowerPartitionName().ifPresent(from -> queryParams.put(FROM, from));
+    range
+        .lowerBoundType()
+        .ifPresent(
+            boundType ->
+                queryParams.put(
+                    FROM_INCLUSIVE, String.valueOf(boundType == 
PartitionRange.BoundType.CLOSED)));
+
+    range.upperPartitionName().ifPresent(to -> queryParams.put(TO, to));
+    range
+        .upperBoundType()
+        .ifPresent(
+            boundType ->
+                queryParams.put(
+                    TO_INCLUSIVE, String.valueOf(boundType == 
PartitionRange.BoundType.CLOSED)));
+
+    PartitionStatisticsListResponse response =
+        restClient.get(
+            statisticsRequestPath,
+            queryParams,
+            PartitionStatisticsListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+    response.validate();
+
+    return Lists.newArrayList(response.getPartitionStatistics());
+  }
+
+  @Override
+  public void updatePartitionStatistics(List<PartitionStatisticsUpdate> 
statisticsToUpdate)
+      throws UnmodifiableStatisticException {
+    List<PartitionStatisticsUpdateDTO> updates =
+        statisticsToUpdate.stream()
+            .map(
+                update ->
+                    PartitionStatisticsUpdateDTO.of(update.partitionName(), 
update.statistics()))
+            .collect(Collectors.toList());
+    PartitionStatisticsUpdateRequest request = new 
PartitionStatisticsUpdateRequest(updates);
+    request.validate();
+
+    BaseResponse response =
+        restClient.put(
+            statisticsRequestPath,
+            request,
+            BaseResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+    response.validate();
+  }
+
+  @Override
+  public boolean dropPartitionStatistics(List<PartitionStatisticsDrop> 
statisticsToDrop)
+      throws UnmodifiableStatisticException {
+    PartitionStatisticsDropRequest request =
+        new PartitionStatisticsDropRequest(
+            statisticsToDrop.stream()
+                .map(
+                    drop ->
+                        PartitionStatisticsDropDTO.of(
+                            drop.partitionName(), (drop.statisticNames())))
+                .collect(Collectors.toList()));
+    request.validate();
+
+    DropResponse response =
+        restClient.post(
+            statisticsRequestPath,
+            request,
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+    response.validate();
+
+    return response.dropped();
+  }
+}
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectStatisticsOperations.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectStatisticsOperations.java
new file mode 100644
index 0000000000..415b7e40ab
--- /dev/null
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/MetadataObjectStatisticsOperations.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.dto.requests.StatisticsDropRequest;
+import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.SupportsStatistics;
+
+/**
+ * The implementation of {@link SupportsStatistics}. This interface will be 
composited into table to
+ * provide statistics operations for metadata objects.
+ */
+class MetadataObjectStatisticsOperations implements SupportsStatistics {
+
+  private final RESTClient restClient;
+  private final String statisticsRequestPath;
+
+  MetadataObjectStatisticsOperations(
+      String metalakeName, MetadataObject metadataObject, RESTClient 
restClient) {
+    this.restClient = restClient;
+    this.statisticsRequestPath =
+        String.format(
+            "api/metalakes/%s/objects/%s/%s/statistics",
+            RESTUtils.encodeString(metalakeName),
+            metadataObject.type().name().toLowerCase(Locale.ROOT),
+            RESTUtils.encodeString(metadataObject.fullName()));
+  }
+
+  @Override
+  public List<Statistic> listStatistics() {
+    StatisticListResponse resp =
+        restClient.get(
+            statisticsRequestPath,
+            StatisticListResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+
+    resp.validate();
+
+    return Arrays.asList(resp.getStatistics());
+  }
+
+  @Override
+  public void updateStatistics(Map<String, StatisticValue<?>> statistics)
+      throws UnmodifiableStatisticException, IllegalStatisticNameException {
+    Preconditions.checkArgument(
+        statistics != null && !statistics.isEmpty(), "Statistics map must not 
be null or empty");
+
+    StatisticsUpdateRequest request = 
StatisticsUpdateRequest.builder().updates(statistics).build();
+    request.validate();
+
+    BaseResponse resp =
+        restClient.put(
+            statisticsRequestPath,
+            request,
+            BaseResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+
+    resp.validate();
+  }
+
+  @Override
+  public boolean dropStatistics(List<String> statistics) throws 
UnmodifiableStatisticException {
+    StatisticsDropRequest request = new 
StatisticsDropRequest(statistics.toArray(new String[0]));
+    request.validate();
+
+    DropResponse resp =
+        restClient.post(
+            statisticsRequestPath,
+            request,
+            DropResponse.class,
+            Collections.emptyMap(),
+            ErrorHandlers.statisticsErrorHandler());
+
+    resp.validate();
+
+    return resp.dropped();
+  }
+}
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
index 6a914041cb..88562de570 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/RelationalTable.java
@@ -26,6 +26,7 @@ import com.google.common.base.Joiner;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
@@ -41,11 +42,13 @@ import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.PartitionListResponse;
 import org.apache.gravitino.dto.responses.PartitionNameListResponse;
 import org.apache.gravitino.dto.responses.PartitionResponse;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
 import org.apache.gravitino.exceptions.NoSuchPartitionException;
 import org.apache.gravitino.exceptions.NoSuchPolicyException;
 import org.apache.gravitino.exceptions.NoSuchTagException;
 import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
 import org.apache.gravitino.exceptions.PolicyAlreadyAssociatedException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.SupportsPolicies;
 import org.apache.gravitino.rel.Column;
@@ -57,12 +60,26 @@ import 
org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.partitions.Partition;
 import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.SupportsPartitionStatistics;
+import org.apache.gravitino.stats.SupportsStatistics;
 import org.apache.gravitino.tag.SupportsTags;
 import org.apache.gravitino.tag.Tag;
 
 /** Represents a relational table. */
 class RelationalTable
-    implements Table, SupportsPartitions, SupportsTags, SupportsRoles, 
SupportsPolicies {
+    implements Table,
+        SupportsPartitions,
+        SupportsTags,
+        SupportsRoles,
+        SupportsPolicies,
+        SupportsStatistics,
+        SupportsPartitionStatistics {
 
   private static final Joiner DOT_JOINER = Joiner.on(".");
 
@@ -75,6 +92,8 @@ class RelationalTable
   private final MetadataObjectTagOperations objectTagOperations;
   private final MetadataObjectRoleOperations objectRoleOperations;
   private final MetadataObjectPolicyOperations objectPolicyOperations;
+  private final MetadataObjectStatisticsOperations objectStatisticsOperations;
+  private final MetadataObjectPartitionStatisticsOperations 
objectPartitionStatisticsOperations;
 
   /**
    * Creates a new RelationalTable.
@@ -107,6 +126,11 @@ class RelationalTable
         new MetadataObjectRoleOperations(namespace.level(0), tableObject, 
restClient);
     this.objectPolicyOperations =
         new MetadataObjectPolicyOperations(namespace.level(0), tableObject, 
restClient);
+    this.objectStatisticsOperations =
+        new MetadataObjectStatisticsOperations(namespace.level(0), 
tableObject, restClient);
+    this.objectPartitionStatisticsOperations =
+        new MetadataObjectPartitionStatisticsOperations(
+            namespace.level(0), tableObject, restClient);
   }
 
   /**
@@ -317,6 +341,16 @@ class RelationalTable
     return this;
   }
 
+  @Override
+  public SupportsStatistics supportsStatistics() {
+    return this;
+  }
+
+  @Override
+  public SupportsPartitionStatistics supportsPartitionStatistics() {
+    return this;
+  }
+
   private static String tableFullName(Namespace tableNS, String tableName) {
     return DOT_JOINER.join(tableNS.level(1), tableNS.level(2), tableName);
   }
@@ -366,4 +400,37 @@ class RelationalTable
   public String[] listBindingRoleNames() {
     return objectRoleOperations.listBindingRoleNames();
   }
+
+  @Override
+  public List<Statistic> listStatistics() {
+    return objectStatisticsOperations.listStatistics();
+  }
+
+  @Override
+  public void updateStatistics(Map<String, StatisticValue<?>> statistics)
+      throws UnmodifiableStatisticException, IllegalStatisticNameException {
+    objectStatisticsOperations.updateStatistics(statistics);
+  }
+
+  @Override
+  public boolean dropStatistics(List<String> statistics) throws 
UnmodifiableStatisticException {
+    return objectStatisticsOperations.dropStatistics(statistics);
+  }
+
+  @Override
+  public List<PartitionStatistics> listPartitionStatistics(PartitionRange 
range) {
+    return objectPartitionStatisticsOperations.listPartitionStatistics(range);
+  }
+
+  @Override
+  public void updatePartitionStatistics(List<PartitionStatisticsUpdate> 
statisticsToUpdate)
+      throws UnmodifiableStatisticException {
+    
objectPartitionStatisticsOperations.updatePartitionStatistics(statisticsToUpdate);
+  }
+
+  @Override
+  public boolean dropPartitionStatistics(List<PartitionStatisticsDrop> 
statisticsToDrop)
+      throws UnmodifiableStatisticException {
+    return 
objectPartitionStatisticsOperations.dropPartitionStatistics(statisticsToDrop);
+  }
 }
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsPartitionStatistics.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsPartitionStatistics.java
new file mode 100644
index 0000000000..d19aaed1a6
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsPartitionStatistics.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
+import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED;
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.dto.rel.TableDTO;
+import org.apache.gravitino.dto.requests.PartitionStatisticsDropRequest;
+import org.apache.gravitino.dto.requests.PartitionStatisticsUpdateRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.ErrorConstants;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.PartitionStatisticsListResponse;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsDropDTO;
+import org.apache.gravitino.dto.stats.PartitionStatisticsUpdateDTO;
+import org.apache.gravitino.dto.stats.StatisticDTO;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.apache.gravitino.stats.SupportsPartitionStatistics;
+import org.apache.hc.core5.http.Method;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestSupportsPartitionStatistics extends TestBase {
+
+  private static final String METALAKE_NAME = "metalake";
+  private static final String CATALOG_NAME = "catalog1";
+  private static final String SCHEMA_NAME = "schema1";
+  private static final String TABLE_NAME = "table1";
+
+  private static Table relationalTable;
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    TestBase.setUp();
+    TestGravitinoMetalake.createMetalake(client, METALAKE_NAME);
+
+    relationalTable =
+        RelationalTable.from(
+            Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+            TableDTO.builder()
+                .withName(TABLE_NAME)
+                .withComment("comment1")
+                .withColumns(
+                    new ColumnDTO[] {
+                      ColumnDTO.builder()
+                          .withName("col1")
+                          .withDataType(Types.IntegerType.get())
+                          .build()
+                    })
+                .withProperties(Collections.emptyMap())
+                .withAudit(AuditDTO.builder().withCreator("test").build())
+                .build(),
+            client.restClient());
+  }
+
+  @Test
+  public void testListStatisticsForTable() throws JsonProcessingException {
+    testListStatistics(relationalTable.supportsPartitionStatistics(), 
getTableStatisticsPath());
+  }
+
+  @Test
+  public void testUpdateStatisticsForTable() throws JsonProcessingException {
+    testUpdateStatistics(relationalTable.supportsPartitionStatistics(), 
getTableStatisticsPath());
+  }
+
+  @Test
+  public void testDropStatisticsForTable() throws JsonProcessingException {
+    testDropStatistics(relationalTable.supportsPartitionStatistics(), 
getTableStatisticsPath());
+  }
+
+  private void testListStatistics(SupportsPartitionStatistics 
supportsStatistics, String path)
+      throws JsonProcessingException {
+    // Test successful list
+    AuditDTO audit = 
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+    StatisticDTO stat1 =
+        StatisticDTO.builder()
+            .withName("row_count")
+            .withValue(Optional.of(StatisticValues.longValue(100L)))
+            .withReserved(true)
+            .withModifiable(false)
+            .withAudit(audit)
+            .build();
+
+    StatisticDTO stat2 =
+        StatisticDTO.builder()
+            .withName("custom.user_stat")
+            .withValue(Optional.of(StatisticValues.stringValue("test")))
+            .withReserved(false)
+            .withModifiable(true)
+            .withAudit(audit)
+            .build();
+
+    Map<String, String> map = Maps.newHashMap();
+    map.put("from", "p0");
+
+    PartitionStatisticsDTO partitionStatisticsDTO =
+        PartitionStatisticsDTO.of("p0", new StatisticDTO[] {stat1, stat2});
+    PartitionStatisticsListResponse response =
+        new PartitionStatisticsListResponse(new PartitionStatisticsDTO[] 
{partitionStatisticsDTO});
+    buildMockResource(Method.GET, path, map, null, response, SC_OK);
+
+    List<PartitionStatistics> statistics =
+        supportsStatistics.listPartitionStatistics(
+            PartitionRange.downTo("p0", PartitionRange.BoundType.CLOSED));
+    Assertions.assertEquals(1, statistics.size());
+    Assertions.assertEquals("row_count", 
statistics.get(0).statistics()[0].name());
+    Assertions.assertEquals(100L, 
statistics.get(0).statistics()[0].value().get().value());
+    Assertions.assertTrue(statistics.get(0).statistics()[0].reserved());
+    Assertions.assertFalse(statistics.get(0).statistics()[0].modifiable());
+
+    Assertions.assertEquals("custom.user_stat", 
statistics.get(0).statistics()[1].name());
+    Assertions.assertEquals("test", 
statistics.get(0).statistics()[1].value().get().value());
+    Assertions.assertFalse(statistics.get(0).statistics()[1].reserved());
+    Assertions.assertTrue(statistics.get(0).statistics()[1].modifiable());
+
+    // Test error handling
+    ErrorResponse errorResp = ErrorResponse.internalError("Internal error");
+    buildMockResource(
+        Method.GET, path, Collections.emptyMap(), null, errorResp, 
SC_INTERNAL_SERVER_ERROR);
+
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () ->
+            supportsStatistics.listPartitionStatistics(
+                PartitionRange.upTo("p0", PartitionRange.BoundType.OPEN)));
+  }
+
+  private void testUpdateStatistics(SupportsPartitionStatistics 
supportsStatistics, String path)
+      throws JsonProcessingException {
+    // Test successful update
+    Map<String, StatisticValue<?>> statisticsToUpdate = new HashMap<>();
+    statisticsToUpdate.put("row_count", StatisticValues.longValue(200L));
+    statisticsToUpdate.put("custom.user_stat", 
StatisticValues.stringValue("updated"));
+
+    Map<String, StatisticValue<?>> expectedDTOs = new HashMap<>();
+    expectedDTOs.put("row_count", StatisticValues.longValue(200L));
+    expectedDTOs.put("custom.user_stat", 
StatisticValues.stringValue("updated"));
+
+    List<PartitionStatisticsUpdateDTO> updateDTOS = Lists.newArrayList();
+    updateDTOS.add(PartitionStatisticsUpdateDTO.of("p0", expectedDTOs));
+    PartitionStatisticsUpdateRequest expectedRequest =
+        new PartitionStatisticsUpdateRequest(updateDTOS);
+
+    List<PartitionStatisticsUpdate> updates = Lists.newArrayList();
+    updates.add(PartitionStatisticsModification.update("p0", 
statisticsToUpdate));
+    BaseResponse response = new BaseResponse(0);
+    buildMockResource(Method.PUT, path, Collections.emptyMap(), 
expectedRequest, response, SC_OK);
+
+    supportsStatistics.updatePartitionStatistics(updates);
+
+    // Test unmodifiable statistic exception
+    String unmodifiableErrorJson =
+        MAPPER.writeValueAsString(
+            ImmutableMap.of(
+                "code",
+                ErrorConstants.UNSUPPORTED_OPERATION_CODE,
+                "type",
+                UnmodifiableStatisticException.class.getSimpleName(),
+                "message",
+                "Cannot modify reserved statistic",
+                "stack",
+                Collections.emptyList()));
+    ErrorResponse unmodifiableError = MAPPER.readValue(unmodifiableErrorJson, 
ErrorResponse.class);
+    buildMockResource(
+        Method.PUT,
+        path,
+        Collections.emptyMap(),
+        expectedRequest,
+        unmodifiableError,
+        SC_METHOD_NOT_ALLOWED);
+
+    Assertions.assertThrows(
+        UnmodifiableStatisticException.class,
+        () -> supportsStatistics.updatePartitionStatistics(updates));
+
+    // Test illegal statistic name exception
+    ErrorResponse illegalNameError =
+        ErrorResponse.illegalArguments(
+            IllegalStatisticNameException.class.getSimpleName(), "Invalid 
statistic name", null);
+    buildMockResource(
+        Method.PUT,
+        path,
+        Collections.emptyMap(),
+        expectedRequest,
+        illegalNameError,
+        SC_BAD_REQUEST);
+
+    Assertions.assertThrows(
+        IllegalStatisticNameException.class,
+        () -> supportsStatistics.updatePartitionStatistics(updates));
+  }
+
+  private void testDropStatistics(SupportsPartitionStatistics 
supportsStatistics, String path)
+      throws JsonProcessingException {
+    // Test successful drop
+    List<String> statisticsToDrop = Arrays.asList("custom.user_stat1", 
"custom.user_stat2");
+
+    List<PartitionStatisticsDropDTO> requestDrops = Lists.newArrayList();
+    requestDrops.add(PartitionStatisticsDropDTO.of("p0", statisticsToDrop));
+    PartitionStatisticsDropRequest request = new 
PartitionStatisticsDropRequest(requestDrops);
+    DropResponse response = new DropResponse(true);
+    buildMockResource(Method.POST, path, Collections.emptyMap(), request, 
response, SC_OK);
+
+    List<PartitionStatisticsDrop> drops = Lists.newArrayList();
+    drops.add(PartitionStatisticsModification.drop("p0", statisticsToDrop));
+    boolean dropped = supportsStatistics.dropPartitionStatistics(drops);
+    Assertions.assertTrue(dropped);
+
+    // Test drop non-existing statistics
+    response = new DropResponse(false);
+    buildMockResource(Method.POST, path, Collections.emptyMap(), request, 
response, SC_OK);
+
+    dropped = supportsStatistics.dropPartitionStatistics(drops);
+    Assertions.assertFalse(dropped);
+
+    // Test unmodifiable statistic exception
+    String unmodifiableErrorJson =
+        MAPPER.writeValueAsString(
+            ImmutableMap.of(
+                "code",
+                ErrorConstants.UNSUPPORTED_OPERATION_CODE,
+                "type",
+                UnmodifiableStatisticException.class.getSimpleName(),
+                "message",
+                "Cannot drop reserved statistic",
+                "stack",
+                Collections.emptyList()));
+    ErrorResponse unmodifiableError = MAPPER.readValue(unmodifiableErrorJson, 
ErrorResponse.class);
+    buildMockResource(
+        Method.POST, path, Collections.emptyMap(), null, unmodifiableError, 
SC_METHOD_NOT_ALLOWED);
+
+    Assertions.assertThrows(
+        UnmodifiableStatisticException.class,
+        () -> supportsStatistics.dropPartitionStatistics(drops));
+  }
+
+  private String getTableStatisticsPath() {
+    return String.format(
+        "/api/metalakes/%s/objects/table/%s.%s.%s/statistics/partitions",
+        METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, TABLE_NAME);
+  }
+}
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsStatistics.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsStatistics.java
new file mode 100644
index 0000000000..65420e8198
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestSupportsStatistics.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client;
+
+import static org.apache.hc.core5.http.HttpStatus.SC_BAD_REQUEST;
+import static org.apache.hc.core5.http.HttpStatus.SC_INTERNAL_SERVER_ERROR;
+import static org.apache.hc.core5.http.HttpStatus.SC_METHOD_NOT_ALLOWED;
+import static org.apache.hc.core5.http.HttpStatus.SC_OK;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.rel.ColumnDTO;
+import org.apache.gravitino.dto.rel.TableDTO;
+import org.apache.gravitino.dto.requests.StatisticsDropRequest;
+import org.apache.gravitino.dto.requests.StatisticsUpdateRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.ErrorConstants;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.StatisticListResponse;
+import org.apache.gravitino.dto.stats.StatisticDTO;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.apache.gravitino.stats.SupportsStatistics;
+import org.apache.hc.core5.http.Method;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestSupportsStatistics extends TestBase {
+
+  private static final String METALAKE_NAME = "metalake";
+  private static final String CATALOG_NAME = "catalog1";
+  private static final String SCHEMA_NAME = "schema1";
+  private static final String TABLE_NAME = "table1";
+
+  private static Table relationalTable;
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    TestBase.setUp();
+    TestGravitinoMetalake.createMetalake(client, METALAKE_NAME);
+
+    relationalTable =
+        RelationalTable.from(
+            Namespace.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME),
+            TableDTO.builder()
+                .withName(TABLE_NAME)
+                .withComment("comment1")
+                .withColumns(
+                    new ColumnDTO[] {
+                      ColumnDTO.builder()
+                          .withName("col1")
+                          .withDataType(Types.IntegerType.get())
+                          .build()
+                    })
+                .withProperties(Collections.emptyMap())
+                .withAudit(AuditDTO.builder().withCreator("test").build())
+                .build(),
+            client.restClient());
+  }
+
+  @Test
+  public void testListStatisticsForTable() throws JsonProcessingException {
+    testListStatistics(relationalTable.supportsStatistics(), 
getTableStatisticsPath());
+  }
+
+  @Test
+  public void testUpdateStatisticsForTable() throws JsonProcessingException {
+    testUpdateStatistics(relationalTable.supportsStatistics(), 
getTableStatisticsPath());
+  }
+
+  @Test
+  public void testDropStatisticsForTable() throws JsonProcessingException {
+    testDropStatistics(relationalTable.supportsStatistics(), 
getTableStatisticsPath());
+  }
+
+  private void testListStatistics(SupportsStatistics supportsStatistics, 
String path)
+      throws JsonProcessingException {
+    // Test successful list
+    AuditDTO audit = 
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+    StatisticDTO stat1 =
+        StatisticDTO.builder()
+            .withName("row_count")
+            .withValue(Optional.of(StatisticValues.longValue(100L)))
+            .withReserved(true)
+            .withModifiable(false)
+            .withAudit(audit)
+            .build();
+
+    StatisticDTO stat2 =
+        StatisticDTO.builder()
+            .withName("custom.user_stat")
+            .withValue(Optional.of(StatisticValues.stringValue("test")))
+            .withReserved(false)
+            .withModifiable(true)
+            .withAudit(audit)
+            .build();
+
+    StatisticListResponse response = new StatisticListResponse(new 
StatisticDTO[] {stat1, stat2});
+    buildMockResource(Method.GET, path, Collections.emptyMap(), null, 
response, SC_OK);
+
+    List<Statistic> statistics = supportsStatistics.listStatistics();
+    Assertions.assertEquals(2, statistics.size());
+    Assertions.assertEquals("row_count", statistics.get(0).name());
+    Assertions.assertEquals(100L, statistics.get(0).value().get().value());
+    Assertions.assertTrue(statistics.get(0).reserved());
+    Assertions.assertFalse(statistics.get(0).modifiable());
+
+    Assertions.assertEquals("custom.user_stat", statistics.get(1).name());
+    Assertions.assertEquals("test", statistics.get(1).value().get().value());
+    Assertions.assertFalse(statistics.get(1).reserved());
+    Assertions.assertTrue(statistics.get(1).modifiable());
+
+    // Test error handling
+    ErrorResponse errorResp = ErrorResponse.internalError("Internal error");
+    buildMockResource(
+        Method.GET, path, Collections.emptyMap(), null, errorResp, 
SC_INTERNAL_SERVER_ERROR);
+
+    Assertions.assertThrows(RuntimeException.class, 
supportsStatistics::listStatistics);
+  }
+
+  private void testUpdateStatistics(SupportsStatistics supportsStatistics, 
String path)
+      throws JsonProcessingException {
+    // Test successful update
+    Map<String, StatisticValue<?>> statisticsToUpdate = new HashMap<>();
+    statisticsToUpdate.put("row_count", StatisticValues.longValue(200L));
+    statisticsToUpdate.put("custom.user_stat", 
StatisticValues.stringValue("updated"));
+
+    Map<String, StatisticValue<?>> expectedDTOs = new HashMap<>();
+    expectedDTOs.put("row_count", StatisticValues.longValue(200L));
+    expectedDTOs.put("custom.user_stat", 
StatisticValues.stringValue("updated"));
+
+    StatisticsUpdateRequest expectedRequest =
+        StatisticsUpdateRequest.builder().updates(expectedDTOs).build();
+
+    BaseResponse response = new BaseResponse(0);
+    buildMockResource(Method.PUT, path, Collections.emptyMap(), 
expectedRequest, response, SC_OK);
+
+    supportsStatistics.updateStatistics(statisticsToUpdate);
+
+    // Test unmodifiable statistic exception
+    String unmodifiableErrorJson =
+        MAPPER.writeValueAsString(
+            ImmutableMap.of(
+                "code",
+                ErrorConstants.UNSUPPORTED_OPERATION_CODE,
+                "type",
+                UnmodifiableStatisticException.class.getSimpleName(),
+                "message",
+                "Cannot modify reserved statistic",
+                "stack",
+                Collections.emptyList()));
+    ErrorResponse unmodifiableError = MAPPER.readValue(unmodifiableErrorJson, 
ErrorResponse.class);
+    buildMockResource(
+        Method.PUT,
+        path,
+        Collections.emptyMap(),
+        expectedRequest,
+        unmodifiableError,
+        SC_METHOD_NOT_ALLOWED);
+
+    Assertions.assertThrows(
+        UnmodifiableStatisticException.class,
+        () -> supportsStatistics.updateStatistics(statisticsToUpdate));
+
+    // Test illegal statistic name exception
+    ErrorResponse illegalNameError =
+        ErrorResponse.illegalArguments(
+            IllegalStatisticNameException.class.getSimpleName(), "Invalid 
statistic name", null);
+    buildMockResource(
+        Method.PUT,
+        path,
+        Collections.emptyMap(),
+        expectedRequest,
+        illegalNameError,
+        SC_BAD_REQUEST);
+
+    Assertions.assertThrows(
+        IllegalStatisticNameException.class,
+        () -> supportsStatistics.updateStatistics(statisticsToUpdate));
+  }
+
+  private void testDropStatistics(SupportsStatistics supportsStatistics, 
String path)
+      throws JsonProcessingException {
+    // Test successful drop
+    List<String> statisticsToDrop = Arrays.asList("custom.user_stat1", 
"custom.user_stat2");
+
+    StatisticsDropRequest request =
+        StatisticsDropRequest.builder().names(statisticsToDrop.toArray(new 
String[0])).build();
+    DropResponse response = new DropResponse(true);
+    buildMockResource(Method.POST, path, Collections.emptyMap(), request, 
response, SC_OK);
+
+    boolean dropped = supportsStatistics.dropStatistics(statisticsToDrop);
+    Assertions.assertTrue(dropped);
+
+    // Test drop non-existing statistics
+    response = new DropResponse(false);
+    buildMockResource(Method.POST, path, Collections.emptyMap(), request, 
response, SC_OK);
+
+    dropped = supportsStatistics.dropStatistics(statisticsToDrop);
+    Assertions.assertFalse(dropped);
+
+    // Test unmodifiable statistic exception
+    String unmodifiableErrorJson =
+        MAPPER.writeValueAsString(
+            ImmutableMap.of(
+                "code",
+                ErrorConstants.UNSUPPORTED_OPERATION_CODE,
+                "type",
+                UnmodifiableStatisticException.class.getSimpleName(),
+                "message",
+                "Cannot drop reserved statistic",
+                "stack",
+                Collections.emptyList()));
+    ErrorResponse unmodifiableError = MAPPER.readValue(unmodifiableErrorJson, 
ErrorResponse.class);
+    buildMockResource(
+        Method.POST, path, Collections.emptyMap(), null, unmodifiableError, 
SC_METHOD_NOT_ALLOWED);
+
+    Assertions.assertThrows(
+        UnmodifiableStatisticException.class,
+        () -> supportsStatistics.dropStatistics(statisticsToDrop));
+  }
+
+  private String getTableStatisticsPath() {
+    return String.format(
+        "/api/metalakes/%s/objects/table/%s.%s.%s/statistics",
+        METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, TABLE_NAME);
+  }
+}
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/StatisticIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/StatisticIT.java
new file mode 100644
index 0000000000..f182952cea
--- /dev/null
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/StatisticIT.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.client.integration.test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.IllegalStatisticNameException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.sorts.NullOrdering;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatistics;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("gravitino-docker-test")
+public class StatisticIT extends BaseIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StatisticIT.class);
+  public static final String metalakeName =
+      GravitinoITUtils.genRandomName("cataloghiveit_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("cataloghiveit_catalog");
+  public String SCHEMA_PREFIX = "cataloghiveit_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "cataloghiveit_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public static final String TABLE_COMMENT = "table_comment";
+  public static final String HIVE_COL_NAME1 = "hive_col_name1";
+  public static final String HIVE_COL_NAME2 = "hive_col_name2";
+  public static final String HIVE_COL_NAME3 = "hive_col_name3";
+  private String HIVE_METASTORE_URIS;
+  private final String provider = "hive";
+  private final ContainerSuite containerSuite = ContainerSuite.getInstance();
+  private GravitinoMetalake metalake;
+  private Catalog catalog;
+  private Table table;
+
+  @BeforeAll
+  public void startup() {
+    startNecessaryContainer();
+
+    createMetalake();
+    createCatalog();
+    createSchema();
+    createTable();
+  }
+
+  @Test
+  public void testTableStatisticLifeCycle() {
+    // list empty statistics
+    List<Statistic> statistics = table.supportsStatistics().listStatistics();
+    Assertions.assertTrue(statistics.isEmpty());
+
+    // update statistics
+    Map<String, StatisticValue<?>> updateStatistics = Maps.newHashMap();
+    updateStatistics.put("custom-k1", StatisticValues.stringValue("v1"));
+    updateStatistics.put("custom-k2", StatisticValues.stringValue("v2"));
+    table.supportsStatistics().updateStatistics(updateStatistics);
+
+    statistics = table.supportsStatistics().listStatistics();
+    Assertions.assertEquals(2, statistics.size());
+    statistics.sort(Comparator.comparing(Statistic::name));
+
+    Assertions.assertEquals(statistics.get(0).name(), "custom-k1");
+    Assertions.assertEquals(statistics.get(0).value().get(), 
StatisticValues.stringValue("v1"));
+    Assertions.assertEquals(statistics.get(1).name(), "custom-k2");
+    Assertions.assertEquals(statistics.get(1).value().get(), 
StatisticValues.stringValue("v2"));
+
+    // update partial statistics
+    updateStatistics.clear();
+    updateStatistics.put("custom-k1", StatisticValues.stringValue("v1-1"));
+    updateStatistics.put("custom-k3", StatisticValues.stringValue("v3"));
+    table.supportsStatistics().updateStatistics(updateStatistics);
+
+    statistics = table.supportsStatistics().listStatistics();
+    Assertions.assertEquals(3, statistics.size());
+    statistics.sort(Comparator.comparing(Statistic::name));
+
+    Assertions.assertEquals(statistics.get(0).name(), "custom-k1");
+    Assertions.assertEquals(statistics.get(0).value().get(), 
StatisticValues.stringValue("v1-1"));
+    Assertions.assertEquals(statistics.get(1).name(), "custom-k2");
+    Assertions.assertEquals(statistics.get(1).value().get(), 
StatisticValues.stringValue("v2"));
+    Assertions.assertEquals(statistics.get(2).name(), "custom-k3");
+    Assertions.assertEquals(statistics.get(2).value().get(), 
StatisticValues.stringValue("v3"));
+
+    // update illegal stats
+    Map<String, StatisticValue<?>> illegalUpdateStatistics = Maps.newHashMap();
+    illegalUpdateStatistics.put("k1", StatisticValues.stringValue("v1-2"));
+    Assertions.assertThrows(
+        IllegalStatisticNameException.class,
+        () -> 
table.supportsStatistics().updateStatistics(illegalUpdateStatistics));
+
+    // drop statistics
+    List<String> statisticsToDrop = Lists.newArrayList("custom-k1");
+    table.supportsStatistics().dropStatistics(statisticsToDrop);
+
+    statistics = table.supportsStatistics().listStatistics();
+    Assertions.assertEquals(2, statistics.size());
+
+    statistics.sort(Comparator.comparing(Statistic::name));
+    Assertions.assertEquals(statistics.get(0).name(), "custom-k2");
+    Assertions.assertEquals(statistics.get(0).value().get(), 
StatisticValues.stringValue("v2"));
+    Assertions.assertEquals(statistics.get(1).name(), "custom-k3");
+    Assertions.assertEquals(statistics.get(1).value().get(), 
StatisticValues.stringValue("v3"));
+  }
+
+  @Test
+  public void testPartitionStatisticLifeCycle() {
+    // list empty partition statistics
+    PartitionRange range = PartitionRange.downTo("p0", 
PartitionRange.BoundType.CLOSED);
+    table.supportsPartitionStatistics().listPartitionStatistics(range);
+
+    // update partition statistics
+    List<PartitionStatisticsUpdate> statisticsToUpdate = Lists.newArrayList();
+    Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+    stats.put("custom-k1", StatisticValues.stringValue("v1"));
+    stats.put("custom-k2", StatisticValues.stringValue("v2"));
+    statisticsToUpdate.add(PartitionStatisticsModification.update("p1", 
stats));
+    
table.supportsPartitionStatistics().updatePartitionStatistics(statisticsToUpdate);
+
+    List<PartitionStatistics> listedStats =
+        table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertEquals(1, listedStats.size());
+    PartitionStatistics partitionStatistics = listedStats.get(0);
+    Assertions.assertEquals("p1", partitionStatistics.partitionName());
+    Statistic[] statistics = partitionStatistics.statistics();
+    Assertions.assertEquals(2, statistics.length);
+    Arrays.sort(statistics, Comparator.comparing(Statistic::name));
+    Assertions.assertEquals(statistics[0].name(), "custom-k1");
+    Assertions.assertEquals(statistics[0].value().get(), 
StatisticValues.stringValue("v1"));
+    Assertions.assertEquals(statistics[1].name(), "custom-k2");
+    Assertions.assertEquals(statistics[1].value().get(), 
StatisticValues.stringValue("v2"));
+
+    // update partial partition statistics
+    statisticsToUpdate.clear();
+    stats.clear();
+    stats.put("custom-k1", StatisticValues.stringValue("v1-1"));
+    stats.put("custom-k3", StatisticValues.stringValue("v3"));
+    statisticsToUpdate.add(PartitionStatisticsModification.update("p0", 
stats));
+    statisticsToUpdate.add(PartitionStatisticsModification.update("p1", 
stats));
+    
table.supportsPartitionStatistics().updatePartitionStatistics(statisticsToUpdate);
+
+    listedStats = 
table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertEquals(2, listedStats.size());
+    listedStats.sort(Comparator.comparing(PartitionStatistics::partitionName));
+    partitionStatistics = listedStats.get(0);
+    Assertions.assertEquals("p0", partitionStatistics.partitionName());
+    statistics = partitionStatistics.statistics();
+    Assertions.assertEquals(2, statistics.length);
+    Arrays.sort(statistics, Comparator.comparing(Statistic::name));
+    Assertions.assertEquals(statistics[0].name(), "custom-k1");
+    Assertions.assertEquals(statistics[0].value().get(), 
StatisticValues.stringValue("v1-1"));
+    Assertions.assertEquals(statistics[1].name(), "custom-k3");
+    Assertions.assertEquals(statistics[1].value().get(), 
StatisticValues.stringValue("v3"));
+    partitionStatistics = listedStats.get(1);
+    Assertions.assertEquals("p1", partitionStatistics.partitionName());
+    statistics = partitionStatistics.statistics();
+    Assertions.assertEquals(3, statistics.length);
+    Arrays.sort(statistics, Comparator.comparing(Statistic::name));
+    Assertions.assertEquals(statistics[0].name(), "custom-k1");
+    Assertions.assertEquals(statistics[0].value().get(), 
StatisticValues.stringValue("v1-1"));
+    Assertions.assertEquals(statistics[1].name(), "custom-k2");
+    Assertions.assertEquals(statistics[1].value().get(), 
StatisticValues.stringValue("v2"));
+    Assertions.assertEquals(statistics[2].name(), "custom-k3");
+    Assertions.assertEquals(statistics[2].value().get(), 
StatisticValues.stringValue("v3"));
+
+    // update illegal partition stats
+    Map<String, StatisticValue<?>> illegalUpdateStatistics = Maps.newHashMap();
+    illegalUpdateStatistics.put("k1", StatisticValues.stringValue("v1-2"));
+    Assertions.assertThrows(
+        IllegalStatisticNameException.class,
+        () ->
+            table
+                .supportsPartitionStatistics()
+                .updatePartitionStatistics(
+                    Lists.newArrayList(
+                        PartitionStatisticsModification.update("p0", 
illegalUpdateStatistics))));
+
+    // drop partition statistics
+    List<PartitionStatisticsDrop> statisticsToDrop = Lists.newArrayList();
+    statisticsToDrop.add(
+        PartitionStatisticsModification.drop("p0", 
Lists.newArrayList("custom-k1")));
+
+    
table.supportsPartitionStatistics().dropPartitionStatistics(statisticsToDrop);
+
+    listedStats = 
table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertEquals(2, listedStats.size());
+    listedStats.sort(Comparator.comparing(PartitionStatistics::partitionName));
+    partitionStatistics = listedStats.get(0);
+    Assertions.assertEquals("p0", partitionStatistics.partitionName());
+    statistics = partitionStatistics.statistics();
+    Assertions.assertEquals(1, statistics.length);
+    Assertions.assertEquals(statistics[0].name(), "custom-k3");
+    Assertions.assertEquals(statistics[0].value().get(), 
StatisticValues.stringValue("v3"));
+
+    partitionStatistics = listedStats.get(1);
+    Assertions.assertEquals("p1", partitionStatistics.partitionName());
+    statistics = partitionStatistics.statistics();
+    Assertions.assertEquals(3, statistics.length);
+    Arrays.sort(statistics, Comparator.comparing(Statistic::name));
+    Assertions.assertEquals(statistics[0].name(), "custom-k1");
+    Assertions.assertEquals(statistics[0].value().get(), 
StatisticValues.stringValue("v1-1"));
+    Assertions.assertEquals(statistics[1].name(), "custom-k2");
+    Assertions.assertEquals(statistics[1].value().get(), 
StatisticValues.stringValue("v2"));
+    Assertions.assertEquals(statistics[2].name(), "custom-k3");
+    Assertions.assertEquals(statistics[2].value().get(), 
StatisticValues.stringValue("v3"));
+
+    // Test range cases
+    range = PartitionRange.downTo("p1", PartitionRange.BoundType.OPEN);
+    listedStats = 
table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertTrue(listedStats.isEmpty());
+
+    range = PartitionRange.upTo("p1", PartitionRange.BoundType.OPEN);
+    listedStats = 
table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertEquals(1, listedStats.size());
+
+    range = PartitionRange.upTo("p1", PartitionRange.BoundType.CLOSED);
+    listedStats = 
table.supportsPartitionStatistics().listPartitionStatistics(range);
+    Assertions.assertEquals(2, listedStats.size());
+  }
+
+  protected void startNecessaryContainer() {
+    containerSuite.startHiveContainer();
+
+    HIVE_METASTORE_URIS =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  protected void createCatalog() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("metastore.uris", HIVE_METASTORE_URIS);
+
+    metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, provider, 
"comment", properties);
+
+    catalog = metalake.loadCatalog(catalogName);
+  }
+
+  private void createSchema() {
+    Map<String, String> schemaProperties = createSchemaProperties();
+    String comment = "comment";
+    catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
+  }
+
+  private void createTable() {
+    Column[] columns = createColumns();
+
+    NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Distribution distribution =
+        Distributions.of(Strategy.EVEN, 10, 
NamedReference.field(HIVE_COL_NAME1));
+
+    final SortOrder[] sortOrders =
+        new SortOrder[] {
+          SortOrders.of(
+              NamedReference.field(HIVE_COL_NAME2),
+              SortDirection.DESCENDING,
+              NullOrdering.NULLS_FIRST)
+        };
+
+    Map<String, String> properties = createProperties();
+    table =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                nameIdentifier,
+                columns,
+                TABLE_COMMENT,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                distribution,
+                sortOrders);
+  }
+
+  @AfterAll
+  public void stop() {
+    if (client != null) {
+      Arrays.stream(catalog.asSchemas().listSchemas())
+          .filter(schema -> !schema.equals("default"))
+          .forEach(
+              (schema -> {
+                catalog.asSchemas().dropSchema(schema, true);
+              }));
+      Arrays.stream(metalake.listCatalogs())
+          .forEach(
+              catalogName -> {
+                metalake.dropCatalog(catalogName, true);
+              });
+      client.dropMetalake(metalakeName, true);
+    }
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Failed to close CloseableGroup", e);
+    }
+
+    client = null;
+  }
+
+  protected Map<String, String> createSchemaProperties() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    properties.put(
+        "location",
+        String.format(
+            "hdfs://%s:%d/user/hive/warehouse/%s.db",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HDFS_DEFAULTFS_PORT,
+            schemaName.toLowerCase()));
+    return properties;
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(HIVE_COL_NAME1, Types.ByteType.get(), 
"col_1_comment");
+    Column col2 = Column.of(HIVE_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
+    Column col3 = Column.of(HIVE_COL_NAME3, Types.StringType.get(), 
"col_3_comment");
+    return new Column[] {col1, col2, col3};
+  }
+
+  protected Map<String, String> createProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    return properties;
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
index 015743bbfe..10b510a581 100644
--- a/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/ErrorResponse.java
@@ -326,7 +326,9 @@ public class ErrorResponse extends BaseResponse {
   public static ErrorResponse unsupportedOperation(String message, Throwable 
throwable) {
     return new ErrorResponse(
         ErrorConstants.UNSUPPORTED_OPERATION_CODE,
-        UnsupportedOperationException.class.getSimpleName(),
+        throwable == null
+            ? UnsupportedOperationException.class.getSimpleName()
+            : throwable.getClass().getSimpleName(),
         message,
         getStackTrace(throwable));
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
index f0e0e7d215..de94eef949 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
@@ -56,7 +56,7 @@ public class StatisticPostgresSQLProvider extends 
StatisticBaseSQLProvider {
         + "  audit_info = EXCLUDED.audit_info,"
         + "  current_version = EXCLUDED.current_version,"
         + "  last_version = EXCLUDED.last_version,"
-        + "  deleted_at = EXCLUDE.deleted_at"
+        + "  deleted_at = EXCLUDED.deleted_at"
         + "</script>";
   }
 }
diff --git a/scripts/postgresql/schema-1.0.0-postgresql.sql 
b/scripts/postgresql/schema-1.0.0-postgresql.sql
index b493110412..e571156318 100644
--- a/scripts/postgresql/schema-1.0.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.0.0-postgresql.sql
@@ -664,7 +664,7 @@ COMMENT ON COLUMN policy_relation_meta.last_version IS 
'policy relation last ver
 COMMENT ON COLUMN policy_relation_meta.deleted_at IS 'policy relation deleted 
at';
 
 CREATE TABLE IF NOT EXISTS statistic_meta (
-    id BIGINT NOT NULL,
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
     statistic_id BIGINT NOT NULL,
     statistic_name VARCHAR(128) NOT NULL,
     metalake_id BIGINT NOT NULL,
diff --git a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql 
b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
index e9e7d63c56..2c25a0240e 100644
--- a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
@@ -100,7 +100,7 @@ ALTER TABLE model_version_info ADD CONSTRAINT 
uk_mid_ver_uri_del UNIQUE (model_i
 ALTER TABLE model_version_info ALTER COLUMN model_version_uri_name DROP 
DEFAULT;
 
 CREATE TABLE IF NOT EXISTS statistic_meta (
-    id BIGINT NOT NULL,
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
     statistic_id BIGINT NOT NULL,
     statistic_name VARCHAR(128) NOT NULL,
     metalake_id BIGINT NOT NULL,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index a9dc5ff630..0ea4787626 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -939,6 +939,9 @@ public class ExceptionHandlers {
       } else if (e instanceof NotFoundException) {
         return Utils.notFound(errorMsg, e);
 
+      } else if (e instanceof UnsupportedOperationException) {
+        return Utils.unsupportedOperation(errorMsg, e);
+
       } else {
         return super.handle(op, name, object, e);
       }
@@ -968,6 +971,9 @@ public class ExceptionHandlers {
       } else if (e instanceof NotFoundException) {
         return Utils.notFound(errorMsg, e);
 
+      } else if (e instanceof UnsupportedOperationException) {
+        return Utils.unsupportedOperation(errorMsg, e);
+
       } else {
         return super.handle(op, partition, table, e);
       }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
index 3a4c20e68f..18d00de09a 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestStatisticOperations.java
@@ -60,6 +60,7 @@ import org.apache.gravitino.dto.stats.StatisticDTO;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.exceptions.IllegalStatisticNameException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rest.RESTUtils;
@@ -338,6 +339,35 @@ public class TestStatisticOperations extends JerseyTest {
     Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
errorResp3.getCode());
     Assertions.assertEquals(
         IllegalStatisticNameException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw UnmodifiableStatisticException
+    doThrow(new UnmodifiableStatisticException("mock error"))
+        .when(manager)
+        .updateStatistics(any(), any(), any());
+    statsMap.clear();
+    statsMap.put(Statistic.CUSTOM_PREFIX + "test1", 
StatisticValues.stringValue("test"));
+    req = new StatisticsUpdateRequest(statsMap);
+
+    Response resp4 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    
Assertions.assertEquals(Response.Status.METHOD_NOT_ALLOWED.getStatusCode(), 
resp4.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp4.getMediaType());
+
+    ErrorResponse errorResp4 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.UNSUPPORTED_OPERATION_CODE, 
errorResp4.getCode());
+    Assertions.assertEquals(
+        UnmodifiableStatisticException.class.getSimpleName(), 
errorResp4.getType());
   }
 
   @Test
@@ -415,6 +445,31 @@ public class TestStatisticOperations extends JerseyTest {
     ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
     Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
     Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw UnmodifiableStatisticException
+    doThrow(new UnmodifiableStatisticException("mock error"))
+        .when(manager)
+        .dropStatistics(any(), any(), any());
+    Response resp3 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    
Assertions.assertEquals(Response.Status.METHOD_NOT_ALLOWED.getStatusCode(), 
resp3.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp3.getMediaType());
+
+    ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.UNSUPPORTED_OPERATION_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(
+        UnmodifiableStatisticException.class.getSimpleName(), 
errorResp3.getType());
   }
 
   @Test
@@ -645,6 +700,35 @@ public class TestStatisticOperations extends JerseyTest {
     Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, 
errorResp3.getCode());
     Assertions.assertEquals(
         IllegalStatisticNameException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw UnmodifiableStatisticException
+    statsMap.clear();
+    statsMap.put(Statistic.CUSTOM_PREFIX + "test1", 
StatisticValues.longValue(1L));
+    doThrow(new UnmodifiableStatisticException("mock error"))
+        .when(manager)
+        .updatePartitionStatistics(any(), any(), any());
+
+    req = new PartitionStatisticsUpdateRequest(partitionStatsList);
+    Response resp4 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .put(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    
Assertions.assertEquals(Response.Status.METHOD_NOT_ALLOWED.getStatusCode(), 
resp4.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp4.getMediaType());
+
+    ErrorResponse errorResp4 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.UNSUPPORTED_OPERATION_CODE, 
errorResp4.getCode());
+    Assertions.assertEquals(
+        UnmodifiableStatisticException.class.getSimpleName(), 
errorResp4.getType());
   }
 
   @Test
@@ -729,6 +813,31 @@ public class TestStatisticOperations extends JerseyTest {
     ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
     Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp2.getCode());
     Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw UnmodifiableStatisticException
+    doThrow(new UnmodifiableStatisticException("mock error"))
+        .when(manager)
+        .dropPartitionStatistics(any(), any(), any());
+    Response resp3 =
+        target(
+                "/metalakes/"
+                    + metalake
+                    + "/objects/"
+                    + tableObject.type()
+                    + "/"
+                    + tableObject.fullName()
+                    + "/statistics/partitions")
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(entity(req, MediaType.APPLICATION_JSON_TYPE));
+
+    
Assertions.assertEquals(Response.Status.METHOD_NOT_ALLOWED.getStatusCode(), 
resp3.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp3.getMediaType());
+
+    ErrorResponse errorResp3 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.UNSUPPORTED_OPERATION_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(
+        UnmodifiableStatisticException.class.getSimpleName(), 
errorResp3.getType());
   }
 
   @Test

Reply via email to