This is an automated email from the ASF dual-hosted git repository.

junday pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c992dd736 [#9838] feat(storage): Add JDBC storage backend for 
partition statistics (#9779)
5c992dd736 is described below

commit 5c992dd736e2ca00163c89b58d0fea332c71133e
Author: cc5566 <[email protected]>
AuthorDate: Tue Feb 3 13:22:55 2026 -0800

    [#9838] feat(storage): Add JDBC storage backend for partition statistics 
(#9779)
    
    ## What changes were proposed in this pull request?
    
    This PR adds JDBC (MySQL, PostgreSQL, H2) as an alternative storage
    backend for partition statistics, complementing the existing Lance
    storage implementation.
    
    ## Why are the changes needed?
    
    Fix #9838
    
    Currently, partition statistics can only be stored using Lance format.
    This PR adds JDBC as an option for users who:
    - Prefer relational database storage
    - Already have database infrastructure (MySQL, PostgreSQL, H2)
    - Need transactional guarantees for statistics updates
    - Want to use existing database backup/recovery tools
    - Want to use the same database as Gravitino's entity storage
    
    ## Does this PR introduce any user-facing change?
    
    No breaking changes. The storage backend is pluggable via configuration:
    
    ```properties
    # Use JDBC storage (default is Lance)
    gravitino.stats.partition.storageFactoryClass = 
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
    
    # JDBC connection settings
    gravitino.stats.partition.storageOption.jdbcUrl = 
jdbc:mysql://localhost:3306/gravitino
    gravitino.stats.partition.storageOption.jdbcUser = root
    gravitino.stats.partition.storageOption.jdbcPassword = gravitino
    gravitino.stats.partition.storageOption.jdbcDriver = 
com.mysql.cj.jdbc.Driver
    ```
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../main/java/org/apache/gravitino/Configs.java    |   9 +-
 .../apache/gravitino/stats/StatisticManager.java   |  53 +-
 .../storage/JdbcPartitionStatisticStorage.java     | 490 +++++++++++++
 .../JdbcPartitionStatisticStorageFactory.java      | 204 ++++++
 .../storage/TestJdbcPartitionStatisticStorage.java | 347 +++++++++
 .../TestJdbcPartitionStatisticStorageFactory.java  | 291 ++++++++
 .../TestJdbcPartitionStatisticStorageIT.java       | 800 +++++++++++++++++++++
 docs/manage-statistics-in-gravitino.md             |  77 +-
 .../integration/test/util/TestDatabaseName.java    |   9 +
 scripts/h2/schema-1.2.0-h2.sql                     |  15 +
 scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql           |  13 +
 scripts/mysql/schema-1.2.0-mysql.sql               |  14 +
 scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql     |  12 +
 scripts/postgresql/schema-1.2.0-postgresql.sql     |  24 +
 .../upgrade-1.1.0-to-1.2.0-postgresql.sql          |  22 +
 15 files changed, 2372 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index fc8d85312d..3cc9d703e6 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,7 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
-import 
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory;
+import org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory;
 
 public class Configs {
 
@@ -502,8 +502,11 @@ public class Configs {
 
   public static final ConfigEntry<String> 
PARTITION_STATS_STORAGE_FACTORY_CLASS =
       new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
-          .doc("The partition stats storage factory class.")
+          .doc(
+              "The partition stats storage factory class. "
+                  + "Default is JDBC-based storage using the same database as 
entity storage. "
+                  + "Set to LancePartitionStatisticStorageFactory for 
Lance-based storage.")
           .version(ConfigConstants.VERSION_1_0_0)
           .stringConf()
-          
.createWithDefault(LancePartitionStatisticStorageFactory.class.getCanonicalName());
+          
.createWithDefault(JdbcPartitionStatisticStorageFactory.class.getCanonicalName());
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java 
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index fc10c1a5f0..18369a748c 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -69,7 +70,7 @@ public class StatisticManager implements Closeable, 
StatisticDispatcher {
     this.store = store;
     this.idGenerator = idGenerator;
     String className = 
config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS);
-    Map<String, String> options = config.getConfigsWithPrefix(OPTIONS_PREFIX);
+    Map<String, String> options = buildStorageOptions(config);
     try {
       PartitionStatisticStorageFactory factory =
           (PartitionStatisticStorageFactory)
@@ -85,6 +86,56 @@ public class StatisticManager implements Closeable, 
StatisticDispatcher {
     }
   }
 
+  /**
+   * Builds storage options map by merging entity store JDBC configs (as 
defaults) with
+   * partition-specific configs (which override defaults).
+   *
+   * @param config the configuration
+   * @return merged options map
+   */
+  private Map<String, String> buildStorageOptions(Config config) {
+    Map<String, String> options = new HashMap<>();
+
+    // First, add entity store JDBC configs as defaults
+    // These will be used if partition-specific configs are not provided
+    String entityJdbcUrl = 
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL);
+    if (entityJdbcUrl != null && !entityJdbcUrl.isEmpty()) {
+      options.put("jdbcUrl", entityJdbcUrl);
+    }
+
+    String entityJdbcDriver = 
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER);
+    if (entityJdbcDriver != null && !entityJdbcDriver.isEmpty()) {
+      options.put("jdbcDriver", entityJdbcDriver);
+    }
+
+    String entityJdbcUser = 
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER);
+    if (entityJdbcUser != null && !entityJdbcUser.isEmpty()) {
+      options.put("jdbcUser", entityJdbcUser);
+    }
+
+    String entityJdbcPassword = 
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD);
+    if (entityJdbcPassword != null && !entityJdbcPassword.isEmpty()) {
+      options.put("jdbcPassword", entityJdbcPassword);
+    }
+
+    Integer entityMaxConnections =
+        config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS);
+    if (entityMaxConnections != null) {
+      options.put("poolMaxSize", String.valueOf(entityMaxConnections));
+    }
+
+    Long entityWaitMs = 
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS);
+    if (entityWaitMs != null) {
+      options.put("connectionTimeoutMs", String.valueOf(entityWaitMs));
+    }
+
+    // Then, overlay partition-specific configs (these override entity store 
configs)
+    Map<String, String> partitionOptions = 
config.getConfigsWithPrefix(OPTIONS_PREFIX);
+    options.putAll(partitionOptions);
+
+    return options;
+  }
+
   @Override
   public List<Statistic> listStatistics(String metalake, MetadataObject 
metadataObject) {
     try {
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
new file mode 100644
index 0000000000..a976489302
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JDBC-based implementation of {@link PartitionStatisticStorage}.
+ *
+ * <p>This implementation stores partition statistics in a JDBC-compatible 
database table, using
+ * Apache Commons DBCP2 for connection pooling. It supports multiple database 
backends (MySQL,
+ * PostgreSQL, H2). Statistics are stored as JSON-serialized values along with 
audit information.
+ */
+public class JdbcPartitionStatisticStorage implements 
PartitionStatisticStorage {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcPartitionStatisticStorage.class);
+
+  private final DataSource dataSource;
+  private final EntityStore entityStore;
+  private final DatabaseType databaseType;
+
+  /** Supported database types. */
+  private enum DatabaseType {
+    MYSQL,
+    POSTGRESQL,
+    H2
+  }
+
+  // SQL statements for MySQL and H2 (compatible syntax)
+  private static final String INSERT_OR_UPDATE_SQL_MYSQL =
+      "INSERT INTO partition_statistic_meta "
+          + "(table_id, partition_name, statistic_name, statistic_value, 
audit_info, created_at, updated_at) "
+          + "VALUES (?, ?, ?, ?, ?, ?, ?) "
+          + "ON DUPLICATE KEY UPDATE "
+          + "statistic_value = VALUES(statistic_value), "
+          + "audit_info = VALUES(audit_info), "
+          + "updated_at = VALUES(updated_at)";
+
+  // SQL statement for PostgreSQL
+  private static final String INSERT_OR_UPDATE_SQL_POSTGRESQL =
+      "INSERT INTO partition_statistic_meta "
+          + "(table_id, partition_name, statistic_name, statistic_value, 
audit_info, created_at, updated_at) "
+          + "VALUES (?, ?, ?, ?, ?, ?, ?) "
+          + "ON CONFLICT (table_id, partition_name, statistic_name) DO UPDATE 
SET "
+          + "statistic_value = EXCLUDED.statistic_value, "
+          + "audit_info = EXCLUDED.audit_info, "
+          + "updated_at = EXCLUDED.updated_at";
+
+  private static final String SELECT_STATISTICS_SQL =
+      "SELECT partition_name, statistic_name, statistic_value, audit_info "
+          + "FROM partition_statistic_meta "
+          + "WHERE table_id = ? ";
+
+  private static final String DELETE_STATISTICS_SQL =
+      "DELETE FROM partition_statistic_meta "
+          + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?";
+
+  /**
+   * Constructs a new JdbcPartitionStatisticStorage.
+   *
+   * @param dataSource the JDBC DataSource for database connections
+   */
+  public JdbcPartitionStatisticStorage(DataSource dataSource) {
+    this.dataSource = dataSource;
+    this.entityStore = GravitinoEnv.getInstance().entityStore();
+    this.databaseType = detectDatabaseType();
+  }
+
+  /**
+   * Detects the database type from the JDBC connection.
+   *
+   * @return the detected database type
+   */
+  private DatabaseType detectDatabaseType() {
+    try (Connection conn = dataSource.getConnection()) {
+      String productName = 
conn.getMetaData().getDatabaseProductName().toLowerCase();
+      LOG.info("Detected database product: {}", productName);
+
+      if (productName.contains("mysql")) {
+        return DatabaseType.MYSQL;
+      } else if (productName.contains("postgresql")) {
+        return DatabaseType.POSTGRESQL;
+      } else if (productName.contains("h2")) {
+        return DatabaseType.H2;
+      } else {
+        LOG.warn("Unknown database type: {}, defaulting to MySQL syntax", 
productName);
+        return DatabaseType.MYSQL;
+      }
+    } catch (SQLException e) {
+      LOG.error("Failed to detect database type, defaulting to MySQL", e);
+      return DatabaseType.MYSQL;
+    }
+  }
+
+  /**
+   * Gets the appropriate INSERT/UPDATE SQL for the current database type.
+   *
+   * @return the SQL statement
+   */
+  private String getInsertOrUpdateSql() {
+    if (databaseType == DatabaseType.POSTGRESQL) {
+      return INSERT_OR_UPDATE_SQL_POSTGRESQL;
+    } else {
+      // MySQL and H2 use the same syntax
+      return INSERT_OR_UPDATE_SQL_MYSQL;
+    }
+  }
+
+  @Override
+  public List<PersistedPartitionStatistics> listStatistics(
+      String metalake, MetadataObject metadataObject, PartitionRange 
partitionRange)
+      throws IOException {
+    LOG.debug(
+        "Listing statistics for metalake: {}, object: {}, range: {}",
+        metalake,
+        metadataObject.fullName(),
+        partitionRange);
+
+    Long tableId = resolveTableId(metalake, metadataObject);
+    String rangeFilter = buildPartitionRangeFilter(partitionRange);
+
+    String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY 
partition_name, statistic_name";
+
+    try (Connection conn = dataSource.getConnection();
+        PreparedStatement stmt = conn.prepareStatement(sql)) {
+
+      stmt.setLong(1, tableId);
+
+      // Set partition range parameters
+      setPartitionRangeParameters(stmt, partitionRange, 2);
+
+      try (ResultSet rs = stmt.executeQuery()) {
+        return parseResultSet(rs);
+      }
+
+    } catch (SQLException e) {
+      throw new IOException("Failed to list statistics for " + 
metadataObject.fullName(), e);
+    }
+  }
+
+  @Override
+  public List<PersistedPartitionStatistics> listStatistics(
+      String metalake, MetadataObject metadataObject, List<String> 
partitionNames)
+      throws IOException {
+    LOG.debug(
+        "Listing statistics for metalake: {}, object: {}, partitions: {}",
+        metalake,
+        metadataObject.fullName(),
+        partitionNames);
+
+    if (partitionNames.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
+    Long tableId = resolveTableId(metalake, metadataObject);
+
+    // Build IN clause
+    String inClause =
+        partitionNames.stream().map(p -> "?").collect(Collectors.joining(", ", 
"(", ")"));
+    String sql =
+        SELECT_STATISTICS_SQL
+            + "AND partition_name IN "
+            + inClause
+            + " ORDER BY partition_name, statistic_name";
+
+    try (Connection conn = dataSource.getConnection();
+        PreparedStatement stmt = conn.prepareStatement(sql)) {
+
+      stmt.setLong(1, tableId);
+
+      int paramIndex = 2;
+      for (String partitionName : partitionNames) {
+        stmt.setString(paramIndex++, partitionName);
+      }
+
+      try (ResultSet rs = stmt.executeQuery()) {
+        return parseResultSet(rs);
+      }
+
+    } catch (SQLException e) {
+      throw new IOException(
+          "Failed to list statistics for partitions "
+              + partitionNames
+              + " in "
+              + metadataObject.fullName(),
+          e);
+    }
+  }
+
+  @Override
+  public int dropStatistics(String metalake, 
List<MetadataObjectStatisticsDrop> statisticsToDrop)
+      throws IOException {
+    LOG.debug(
+        "Dropping statistics for metalake: {}, {} objects", metalake, 
statisticsToDrop.size());
+
+    int totalDropped = 0;
+
+    try (Connection conn = dataSource.getConnection()) {
+      conn.setAutoCommit(false);
+
+      try (PreparedStatement stmt = 
conn.prepareStatement(DELETE_STATISTICS_SQL)) {
+        for (MetadataObjectStatisticsDrop objectDrop : statisticsToDrop) {
+          Long tableId = resolveTableId(metalake, objectDrop.metadataObject());
+
+          for (PartitionStatisticsDrop drop : objectDrop.drops()) {
+            String partitionName = drop.partitionName();
+            for (String statisticName : drop.statisticNames()) {
+              stmt.setLong(1, tableId);
+              stmt.setString(2, partitionName);
+              stmt.setString(3, statisticName);
+              stmt.addBatch();
+            }
+          }
+        }
+
+        int[] results = stmt.executeBatch();
+        for (int result : results) {
+          // Count successful deletions. Per JDBC spec, executeBatch() returns:
+          // - Positive number: actual update count
+          // - SUCCESS_NO_INFO (-2): operation succeeded but driver doesn't 
know row count
+          // - EXECUTE_FAILED (-3): operation failed (we don't count this)
+          if (result > 0 || result == PreparedStatement.SUCCESS_NO_INFO) {
+            totalDropped++;
+          }
+        }
+
+        conn.commit();
+        LOG.debug("Successfully dropped {} statistics", totalDropped);
+
+      } catch (Exception e) {
+        conn.rollback();
+        throw e;
+      } finally {
+        conn.setAutoCommit(true);
+      }
+
+    } catch (SQLException e) {
+      throw new IOException("Failed to drop statistics", e);
+    }
+
+    return totalDropped;
+  }
+
+  @Override
+  public void updateStatistics(
+      String metalake, List<MetadataObjectStatisticsUpdate> 
statisticsToUpdate) throws IOException {
+    LOG.debug(
+        "Updating statistics for metalake: {}, {} objects", metalake, 
statisticsToUpdate.size());
+
+    try (Connection conn = dataSource.getConnection()) {
+      conn.setAutoCommit(false);
+
+      String sql = getInsertOrUpdateSql();
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        for (MetadataObjectStatisticsUpdate objectUpdate : statisticsToUpdate) 
{
+          Long tableId = resolveTableId(metalake, 
objectUpdate.metadataObject());
+
+          for (PartitionStatisticsUpdate update : 
objectUpdate.partitionUpdates()) {
+            String partitionName = update.partitionName();
+
+            for (Map.Entry<String, StatisticValue<?>> stat : 
update.statistics().entrySet()) {
+              String statisticName = stat.getKey();
+              StatisticValue<?> statisticValue = stat.getValue();
+
+              // Create audit info
+              String currentUser = PrincipalUtils.getCurrentUserName();
+              Instant now = Instant.now();
+              AuditInfo auditInfo =
+                  AuditInfo.builder()
+                      .withCreator(currentUser)
+                      .withCreateTime(now)
+                      .withLastModifier(currentUser)
+                      .withLastModifiedTime(now)
+                      .build();
+
+              // Serialize to JSON
+              String statisticValueJson =
+                  
JsonUtils.anyFieldMapper().writeValueAsString(statisticValue);
+              String auditInfoJson = 
JsonUtils.anyFieldMapper().writeValueAsString(auditInfo);
+
+              long timestamp = now.toEpochMilli();
+
+              stmt.setLong(1, tableId);
+              stmt.setString(2, partitionName);
+              stmt.setString(3, statisticName);
+              stmt.setString(4, statisticValueJson);
+              stmt.setString(5, auditInfoJson);
+              stmt.setLong(6, timestamp);
+              stmt.setLong(7, timestamp);
+
+              stmt.addBatch();
+            }
+          }
+        }
+
+        stmt.executeBatch();
+        conn.commit();
+        LOG.debug("Successfully updated statistics");
+
+      } catch (Exception e) {
+        conn.rollback();
+        throw e;
+      } finally {
+        conn.setAutoCommit(true);
+      }
+
+    } catch (SQLException | JsonProcessingException e) {
+      throw new IOException("Failed to update statistics", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // DataSource lifecycle is managed externally by the factory
+    LOG.debug("Closing JdbcPartitionStatisticStorage");
+  }
+
+  /**
+   * Resolves the table ID for a given metadata object.
+   *
+   * @param metalake the metalake name
+   * @param metadataObject the metadata object
+   * @return the table ID
+   * @throws IOException if unable to resolve the table ID
+   */
+  private Long resolveTableId(String metalake, MetadataObject metadataObject) 
throws IOException {
+    NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+    Entity.EntityType type = MetadataObjectUtil.toEntityType(metadataObject);
+
+    TableEntity tableEntity = entityStore.get(identifier, type, 
TableEntity.class);
+    return tableEntity.id();
+  }
+
+  /**
+   * Builds the SQL filter clause for partition range.
+   *
+   * @param range the partition range
+   * @return the SQL filter clause
+   */
+  private String buildPartitionRangeFilter(PartitionRange range) {
+    StringBuilder filter = new StringBuilder();
+
+    range
+        .lowerPartitionName()
+        .ifPresent(
+            name -> {
+              String op =
+                  range
+                      .lowerBoundType()
+                      .map(t -> t == PartitionRange.BoundType.CLOSED ? ">=" : 
">")
+                      .orElse(">=");
+              filter.append("AND partition_name ").append(op).append(" ? ");
+            });
+
+    range
+        .upperPartitionName()
+        .ifPresent(
+            name -> {
+              String op =
+                  range
+                      .upperBoundType()
+                      .map(t -> t == PartitionRange.BoundType.CLOSED ? "<=" : 
"<")
+                      .orElse("<=");
+              filter.append("AND partition_name ").append(op).append(" ? ");
+            });
+
+    return filter.toString();
+  }
+
+  /**
+   * Sets partition range parameters in the prepared statement.
+   *
+   * @param stmt the prepared statement
+   * @param range the partition range
+   * @param startIndex the starting parameter index
+   * @throws SQLException if setting parameters fails
+   */
+  private void setPartitionRangeParameters(
+      PreparedStatement stmt, PartitionRange range, int startIndex) throws 
SQLException {
+    final int[] paramIndex = {startIndex};
+
+    range
+        .lowerPartitionName()
+        .ifPresent(
+            name -> {
+              try {
+                stmt.setString(paramIndex[0]++, name);
+              } catch (SQLException e) {
+                throw new RuntimeException("Failed to set lower partition name 
parameter", e);
+              }
+            });
+
+    range
+        .upperPartitionName()
+        .ifPresent(
+            name -> {
+              try {
+                stmt.setString(paramIndex[0]++, name);
+              } catch (SQLException e) {
+                throw new RuntimeException("Failed to set upper partition name 
parameter", e);
+              }
+            });
+  }
+
+  /**
+   * Parses a ResultSet into a list of PersistedPartitionStatistics.
+   *
+   * @param rs the result set
+   * @return list of persisted partition statistics
+   * @throws SQLException if reading from result set fails
+   */
+  private List<PersistedPartitionStatistics> parseResultSet(ResultSet rs) 
throws SQLException {
+    Map<String, List<PersistedStatistic>> partitionMap = new HashMap<>();
+
+    while (rs.next()) {
+      String partitionName = rs.getString("partition_name");
+      String statisticName = rs.getString("statistic_name");
+      String statisticValueJson = rs.getString("statistic_value");
+      String auditInfoJson = rs.getString("audit_info");
+
+      try {
+        StatisticValue<?> statisticValue =
+            JsonUtils.anyFieldMapper().readValue(statisticValueJson, 
StatisticValue.class);
+        AuditInfo auditInfo = 
JsonUtils.anyFieldMapper().readValue(auditInfoJson, AuditInfo.class);
+
+        PersistedStatistic persistedStatistic =
+            PersistedStatistic.of(statisticName, statisticValue, auditInfo);
+
+        partitionMap.computeIfAbsent(partitionName, k -> new 
ArrayList<>()).add(persistedStatistic);
+
+      } catch (JsonProcessingException e) {
+        LOG.error(
+            "Failed to parse statistic JSON for partition: {}, statistic: {}",
+            partitionName,
+            statisticName,
+            e);
+        throw new SQLException("Failed to parse statistic JSON", e);
+      }
+    }
+
+    return partitionMap.entrySet().stream()
+        .map(entry -> PersistedPartitionStatistics.of(entry.getKey(), 
entry.getValue()))
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..fc4261c412
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.base.Preconditions;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating {@link JdbcPartitionStatisticStorage} instances.
+ *
+ * <p>This factory creates a JDBC-based partition statistics storage using 
Apache Commons DBCP2 for
+ * connection pooling. It supports multiple database backends (MySQL, 
PostgreSQL, H2) and configures
+ * the connection pool with appropriate settings for partition statistics 
workloads.
+ *
+ * <p>Configuration properties:
+ *
+ * <ul>
+ *   <li>jdbcUrl (required): JDBC connection URL (e.g., 
jdbc:mysql://host:port/db,
+ *       jdbc:postgresql://host:port/db)
+ *   <li>jdbcUser (required): Database username
+ *   <li>jdbcPassword (required): Database password
+ *   <li>jdbcDriver (optional): JDBC driver class name (defaults to 
com.mysql.cj.jdbc.Driver)
+ *   <li>poolMaxSize (optional): Maximum connection pool size (default: 10)
+ *   <li>poolMinIdle (optional): Minimum idle connections (default: 2)
+ *   <li>connectionTimeoutMs (optional): Connection timeout in milliseconds 
(default: 30000)
+ *   <li>testOnBorrow (optional): Test connections before use (default: true)
+ * </ul>
+ */
+public class JdbcPartitionStatisticStorageFactory implements 
PartitionStatisticStorageFactory {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(JdbcPartitionStatisticStorageFactory.class);
+
+  // Configuration keys
+  private static final String JDBC_URL = "jdbcUrl";
+  private static final String JDBC_USER = "jdbcUser";
+  private static final String JDBC_PASSWORD = "jdbcPassword";
+  private static final String JDBC_DRIVER = "jdbcDriver";
+  private static final String POOL_MAX_SIZE = "poolMaxSize";
+  private static final String POOL_MIN_IDLE = "poolMinIdle";
+  private static final String CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+  private static final String TEST_ON_BORROW = "testOnBorrow";
+
+  // Default values
+  private static final String DEFAULT_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+  private static final int DEFAULT_POOL_MAX_SIZE = 10;
+  private static final int DEFAULT_POOL_MIN_IDLE = 2;
+  private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30000L;
+  private static final boolean DEFAULT_TEST_ON_BORROW = true;
+  private static final String VALIDATION_QUERY = "SELECT 1";
+
+  // Keep reference to DataSource for cleanup
+  private BasicDataSource dataSource;
+
+  @Override
+  public PartitionStatisticStorage create(Map<String, String> properties) {
+    LOG.info(
+        "Creating JdbcPartitionStatisticStorage with properties: {}",
+        maskSensitiveProperties(properties));
+
+    validateRequiredProperties(properties);
+
+    try {
+      dataSource = createDataSource(properties);
+      return new JdbcPartitionStatisticStorage(dataSource);
+    } catch (Exception e) {
+      if (dataSource != null) {
+        try {
+          dataSource.close();
+        } catch (SQLException closeException) {
+          LOG.error("Failed to close data source after creation error", 
closeException);
+        }
+      }
+      throw new GravitinoRuntimeException(e, "Failed to create 
JdbcPartitionStatisticStorage");
+    }
+  }
+
+  /**
+   * Creates and configures a BasicDataSource from the provided properties.
+   *
+   * @param properties configuration properties
+   * @return configured DataSource
+   */
+  private BasicDataSource createDataSource(Map<String, String> properties) {
+    BasicDataSource ds = new BasicDataSource();
+
+    // Required properties
+    String jdbcUrl = properties.get(JDBC_URL);
+    String jdbcUser = properties.get(JDBC_USER);
+    String jdbcPassword = properties.get(JDBC_PASSWORD);
+
+    ds.setUrl(jdbcUrl);
+    ds.setUsername(jdbcUser);
+    ds.setPassword(jdbcPassword);
+
+    // Optional properties with defaults
+    String driverClassName = properties.getOrDefault(JDBC_DRIVER, 
DEFAULT_JDBC_DRIVER);
+    ds.setDriverClassName(driverClassName);
+
+    int maxSize =
+        Integer.parseInt(
+            properties.getOrDefault(POOL_MAX_SIZE, 
String.valueOf(DEFAULT_POOL_MAX_SIZE)));
+    ds.setMaxTotal(maxSize);
+
+    int minIdle =
+        Integer.parseInt(
+            properties.getOrDefault(POOL_MIN_IDLE, 
String.valueOf(DEFAULT_POOL_MIN_IDLE)));
+    ds.setMinIdle(minIdle);
+
+    long timeoutMs =
+        Long.parseLong(
+            properties.getOrDefault(
+                CONNECTION_TIMEOUT_MS, 
String.valueOf(DEFAULT_CONNECTION_TIMEOUT_MS)));
+    ds.setMaxWait(Duration.ofMillis(timeoutMs));
+
+    boolean testOnBorrow =
+        Boolean.parseBoolean(
+            properties.getOrDefault(TEST_ON_BORROW, 
String.valueOf(DEFAULT_TEST_ON_BORROW)));
+    ds.setTestOnBorrow(testOnBorrow);
+
+    if (testOnBorrow) {
+      ds.setValidationQuery(VALIDATION_QUERY);
+    }
+
+    LOG.info(
+        "Created JDBC DataSource: url={}, driver={}, maxPoolSize={}, 
minIdle={}, timeout={}ms",
+        jdbcUrl,
+        driverClassName,
+        maxSize,
+        minIdle,
+        timeoutMs);
+
+    return ds;
+  }
+
+  /**
+   * Validates that all required properties are present and non-empty.
+   *
+   * @param properties configuration properties
+   * @throws IllegalArgumentException if required properties are missing or 
empty
+   */
+  private void validateRequiredProperties(Map<String, String> properties) {
+    String jdbcUrl = properties.get(JDBC_URL);
+    Preconditions.checkArgument(
+        jdbcUrl != null && !jdbcUrl.trim().isEmpty(), "Property %s must be 
non-empty", JDBC_URL);
+
+    String jdbcUser = properties.get(JDBC_USER);
+    Preconditions.checkArgument(
+        jdbcUser != null && !jdbcUser.trim().isEmpty(), "Property %s must be 
non-empty", JDBC_USER);
+
+    String jdbcPassword = properties.get(JDBC_PASSWORD);
+    Preconditions.checkArgument(jdbcPassword != null, "Property %s must be 
present", JDBC_PASSWORD);
+  }
+
+  /**
+   * Creates a masked copy of properties for logging (hides password).
+   *
+   * @param properties original properties
+   * @return masked properties map
+   */
+  private Map<String, String> maskSensitiveProperties(Map<String, String> 
properties) {
+    Map<String, String> masked = new HashMap<>(properties);
+    if (masked.containsKey(JDBC_PASSWORD)) {
+      masked.put(JDBC_PASSWORD, "***");
+    }
+    return masked;
+  }
+
+  /**
+   * Closes the data source if it was created by this factory.
+   *
+   * @throws SQLException if closing fails
+   */
+  public void close() throws SQLException {
+    if (dataSource != null) {
+      LOG.info("Closing JDBC DataSource");
+      dataSource.close();
+      dataSource = null;
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
new file mode 100644
index 0000000000..b5242eb967
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@link JdbcPartitionStatisticStorage} using mocked JDBC 
components. */
+public class TestJdbcPartitionStatisticStorage {
+
+  private JdbcPartitionStatisticStorage storage;
+  private DataSource mockDataSource;
+  private Connection mockConnection;
+  private PreparedStatement mockPreparedStatement;
+  private ResultSet mockResultSet;
+  private EntityStore mockEntityStore;
+
+  private static final String METALAKE = "test_metalake";
+  private static final MetadataObject TEST_TABLE =
+      MetadataObjects.of(
+          Lists.newArrayList("catalog", "schema", "table"), 
MetadataObject.Type.TABLE);
+  private static final Long TABLE_ID = 100L;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    // Mock JDBC components
+    mockDataSource = mock(DataSource.class);
+    mockConnection = mock(Connection.class);
+    mockPreparedStatement = mock(PreparedStatement.class);
+    mockResultSet = mock(ResultSet.class);
+
+    when(mockDataSource.getConnection()).thenReturn(mockConnection);
+    
when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement);
+    when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
+
+    // Mock DatabaseMetaData for database type detection
+    DatabaseMetaData mockMetaData = mock(DatabaseMetaData.class);
+    when(mockConnection.getMetaData()).thenReturn(mockMetaData);
+    when(mockMetaData.getDatabaseProductName()).thenReturn("MySQL");
+
+    // Mock EntityStore
+    mockEntityStore = mock(EntityStore.class);
+    TableEntity mockTableEntity = mock(TableEntity.class);
+    when(mockEntityStore.get(any(), any(), any())).thenReturn(mockTableEntity);
+    when(mockTableEntity.id()).thenReturn(TABLE_ID);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
mockEntityStore, true);
+
+    // Create storage with mocked DataSource
+    storage = new JdbcPartitionStatisticStorage(mockDataSource);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (storage != null) {
+      storage.close();
+    }
+  }
+
+  /** Helper method to serialize StatisticValue to JSON for mocking ResultSet. 
*/
+  private String toJson(StatisticValue<?> value) throws 
JsonProcessingException {
+    return JsonUtils.anyFieldMapper().writeValueAsString(value);
+  }
+
+  @Test
+  public void testUpdateStatistics() throws Exception {
+    // Prepare test data
+    Map<String, StatisticValue<?>> stats = new HashMap<>();
+    stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+    stats.put("custom-sizeBytes", StatisticValues.longValue(5242880L));
+
+    PartitionStatisticsUpdate update =
+        PartitionStatisticsModification.update("partition_2024_01", stats);
+
+    List<MetadataObjectStatisticsUpdate> objectUpdates =
+        Lists.newArrayList(
+            MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
Lists.newArrayList(update)));
+
+    // Mock successful execution
+    when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1, 1});
+
+    // Execute
+    storage.updateStatistics(METALAKE, objectUpdates);
+
+    // Verify connection and commit were called
+    verify(mockConnection, times(1)).commit();
+    // Connection.close() is called twice: once during database type detection 
in constructor,
+    // and once during updateStatistics
+    verify(mockConnection, times(2)).close();
+  }
+
+  @Test
+  public void testListStatisticsAllPartitions() throws Exception {
+    // Create actual JSON from StatisticValue objects
+    String rowCountJson = toJson(StatisticValues.longValue(1000L));
+    String sizeBytesJson = toJson(StatisticValues.longValue(5242880L));
+
+    // Mock ResultSet to return test data
+    when(mockResultSet.next()).thenReturn(true, true, false);
+    when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+    when(mockResultSet.getString("partition_name"))
+        .thenReturn("partition_2024_01", "partition_2024_01");
+    when(mockResultSet.getString("statistic_name"))
+        .thenReturn("custom-rowCount", "custom-sizeBytes");
+    when(mockResultSet.getString("statistic_value")).thenReturn(rowCountJson, 
sizeBytesJson);
+    when(mockResultSet.getString("audit_info"))
+        .thenReturn(
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+    // Execute
+    List<PersistedPartitionStatistics> results =
+        storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+    // Verify
+    assertNotNull(results);
+    assertEquals(1, results.size());
+    assertEquals("partition_2024_01", results.get(0).partitionName());
+    assertEquals(2, results.get(0).statistics().size());
+
+    // Verify statistics values
+    List<PersistedStatistic> statistics = results.get(0).statistics();
+    boolean hasRowCount = statistics.stream().anyMatch(s -> 
s.name().equals("custom-rowCount"));
+    boolean hasSizeBytes = statistics.stream().anyMatch(s -> 
s.name().equals("custom-sizeBytes"));
+    assertTrue(hasRowCount);
+    assertTrue(hasSizeBytes);
+  }
+
+  @Test
+  public void testListStatisticsWithClosedRange() throws Exception {
+    // Create actual JSON from StatisticValue object
+    String rowCountJson = toJson(StatisticValues.longValue(2000L));
+
+    // Mock ResultSet
+    when(mockResultSet.next()).thenReturn(true, false);
+    when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+    
when(mockResultSet.getString("partition_name")).thenReturn("partition_2024_02");
+    
when(mockResultSet.getString("statistic_name")).thenReturn("custom-rowCount");
+    when(mockResultSet.getString("statistic_value")).thenReturn(rowCountJson);
+    when(mockResultSet.getString("audit_info"))
+        
.thenReturn("{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+    // Execute with range query
+    storage.listStatistics(
+        METALAKE,
+        TEST_TABLE,
+        PartitionRange.between(
+            "partition_2024_01",
+            PartitionRange.BoundType.CLOSED,
+            "partition_2024_03",
+            PartitionRange.BoundType.CLOSED));
+
+    // Verify query was executed with WHERE clause
+    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+    verify(mockConnection).prepareStatement(sqlCaptor.capture());
+    String executedSql = sqlCaptor.getValue();
+    assertTrue(executedSql.contains("WHERE"));
+    assertTrue(executedSql.contains("table_id"));
+  }
+
+  @Test
+  public void testDropStatistics() throws Exception {
+    // Prepare drop request
+    List<MetadataObjectStatisticsDrop> drops =
+        Lists.newArrayList(
+            MetadataObjectStatisticsDrop.of(
+                TEST_TABLE,
+                Lists.newArrayList(
+                    PartitionStatisticsModification.drop(
+                        "partition_2024_01", 
Lists.newArrayList("custom-rowCount")))));
+
+    // Mock successful batch execution
+    when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1});
+
+    // Execute
+    int dropped = storage.dropStatistics(METALAKE, drops);
+
+    // Verify
+    assertEquals(1, dropped);
+    verify(mockPreparedStatement, times(1)).executeBatch();
+  }
+
+  @Test
+  public void testTransactionRollback() throws Exception {
+    // Prepare test data
+    Map<String, StatisticValue<?>> stats = new HashMap<>();
+    stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+
+    PartitionStatisticsUpdate update =
+        PartitionStatisticsModification.update("partition_2024_01", stats);
+
+    List<MetadataObjectStatisticsUpdate> objectUpdates =
+        Lists.newArrayList(
+            MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
Lists.newArrayList(update)));
+
+    // Mock SQLException during batch execution
+    when(mockPreparedStatement.executeBatch()).thenThrow(new 
SQLException("Test error"));
+
+    // Execute and expect exception
+    assertThrows(Exception.class, () -> storage.updateStatistics(METALAKE, 
objectUpdates));
+
+    // Verify rollback was called
+    verify(mockConnection, times(1)).rollback();
+  }
+
+  @Test
+  public void testMultipleStatisticTypes() throws Exception {
+    // Create actual StatisticValue objects and serialize them to JSON
+    String longValueJson = toJson(StatisticValues.longValue(1000L));
+    String doubleValueJson = toJson(StatisticValues.doubleValue(256.5));
+    String stringValueJson = toJson(StatisticValues.stringValue("parquet"));
+    String booleanValueJson = toJson(StatisticValues.booleanValue(true));
+
+    // Mock ResultSet with different statistic types
+    when(mockResultSet.next()).thenReturn(true, true, true, true, false);
+    when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+    when(mockResultSet.getString("partition_name"))
+        .thenReturn(
+            "partition_2024_01", "partition_2024_01", "partition_2024_01", 
"partition_2024_01");
+    when(mockResultSet.getString("statistic_name"))
+        .thenReturn("custom-rowCount", "custom-avgSize", "custom-format", 
"custom-isCompressed");
+    when(mockResultSet.getString("statistic_value"))
+        .thenReturn(longValueJson, doubleValueJson, stringValueJson, 
booleanValueJson);
+    when(mockResultSet.getString("audit_info"))
+        .thenReturn(
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+            "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+    // Execute
+    List<PersistedPartitionStatistics> results =
+        storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+    // Verify all statistic types
+    assertEquals(1, results.size());
+    assertEquals(4, results.get(0).statistics().size());
+
+    List<PersistedStatistic> statistics = results.get(0).statistics();
+    Map<String, Object> valueMap = new HashMap<>();
+    for (PersistedStatistic stat : statistics) {
+      valueMap.put(stat.name(), stat.value().value());
+    }
+
+    assertEquals(1000L, valueMap.get("custom-rowCount"));
+    assertEquals(256.5, valueMap.get("custom-avgSize"));
+    assertEquals("parquet", valueMap.get("custom-format"));
+    assertEquals(true, valueMap.get("custom-isCompressed"));
+  }
+
+  @Test
+  public void testBatchUpdate() throws Exception {
+    // Prepare multiple partition updates
+    Map<String, StatisticValue<?>> stats1 = new HashMap<>();
+    stats1.put("custom-rowCount", StatisticValues.longValue(1000L));
+
+    Map<String, StatisticValue<?>> stats2 = new HashMap<>();
+    stats2.put("custom-rowCount", StatisticValues.longValue(2000L));
+
+    Map<String, StatisticValue<?>> stats3 = new HashMap<>();
+    stats3.put("custom-rowCount", StatisticValues.longValue(3000L));
+
+    List<PartitionStatisticsUpdate> updates =
+        Lists.newArrayList(
+            PartitionStatisticsModification.update("partition_2024_01", 
stats1),
+            PartitionStatisticsModification.update("partition_2024_02", 
stats2),
+            PartitionStatisticsModification.update("partition_2024_03", 
stats3));
+
+    List<MetadataObjectStatisticsUpdate> objectUpdates =
+        Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
updates));
+
+    // Mock successful batch execution
+    when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1, 1, 1});
+
+    // Execute
+    storage.updateStatistics(METALAKE, objectUpdates);
+
+    // Verify batch was executed
+    verify(mockPreparedStatement, times(1)).executeBatch();
+    verify(mockConnection, times(1)).commit();
+  }
+
+  @Test
+  public void testEmptyResultSet() throws Exception {
+    // Mock empty ResultSet
+    when(mockResultSet.next()).thenReturn(false);
+
+    // Execute
+    List<PersistedPartitionStatistics> results =
+        storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+    // Verify empty list returned
+    assertNotNull(results);
+    assertTrue(results.isEmpty());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..cd2e6f92cb
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link JdbcPartitionStatisticStorageFactory}. */
+public class TestJdbcPartitionStatisticStorageFactory {
+
+  @Test
+  public void testCreateWithValidConfiguration() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+    properties.put("jdbcDriver", "com.mysql.cj.jdbc.Driver");
+
+    // This test only validates that the properties pass validation
+    // Actual storage creation requires GravitinoEnv and is tested in 
integration tests
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      // Expected if GravitinoEnv is not initialized
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testCreateWithDefaultDriver() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+    // jdbcDriver not provided, should use default
+
+    // This test only validates that the properties pass validation
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testCreateWithMissingJdbcUrl() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+    // jdbcUrl is missing
+
+    Exception exception =
+        assertThrows(IllegalArgumentException.class, () -> 
factory.create(properties));
+
+    assertTrue(exception.getMessage().contains("jdbcUrl"));
+  }
+
+  @Test
+  public void testCreateWithMissingJdbcUser() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+    properties.put("jdbcPassword", "test_password");
+    // jdbcUser is missing
+
+    Exception exception =
+        assertThrows(IllegalArgumentException.class, () -> 
factory.create(properties));
+
+    assertTrue(exception.getMessage().contains("jdbcUser"));
+  }
+
+  @Test
+  public void testCreateWithMissingJdbcPassword() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+    properties.put("jdbcUser", "test_user");
+    // jdbcPassword is missing
+
+    Exception exception =
+        assertThrows(IllegalArgumentException.class, () -> 
factory.create(properties));
+
+    assertTrue(exception.getMessage().contains("jdbcPassword"));
+  }
+
+  @Test
+  public void testCreateWithEmptyProperties() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+
+    Exception exception =
+        assertThrows(IllegalArgumentException.class, () -> 
factory.create(properties));
+
+    assertNotNull(exception.getMessage());
+  }
+
+  @Test
+  public void testCreateWithNullProperties() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    assertThrows(NullPointerException.class, () -> factory.create(null));
+  }
+
+  @Test
+  public void testCreateWithAllProperties() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", 
"jdbc:mysql://localhost:3306/test_db?useSSL=false");
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+    properties.put("jdbcDriver", "com.mysql.cj.jdbc.Driver");
+    properties.put("extra-property", "extra-value"); // Extra properties 
should be ignored
+
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleFactoryInstances() {
+    // Test that multiple factory instances can be created independently
+    JdbcPartitionStatisticStorageFactory factory1 = new 
JdbcPartitionStatisticStorageFactory();
+    JdbcPartitionStatisticStorageFactory factory2 = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties1 = new HashMap<>();
+    properties1.put("jdbcUrl", "jdbc:mysql://localhost:3306/db1");
+    properties1.put("jdbcUser", "user1");
+    properties1.put("jdbcPassword", "pass1");
+
+    Map<String, String> properties2 = new HashMap<>();
+    properties2.put("jdbcUrl", "jdbc:mysql://localhost:3306/db2");
+    properties2.put("jdbcUser", "user2");
+    properties2.put("jdbcPassword", "pass2");
+
+    try {
+      PartitionStatisticStorage storage1 = factory1.create(properties1);
+      PartitionStatisticStorage storage2 = factory2.create(properties2);
+      assertNotNull(storage1);
+      assertNotNull(storage2);
+      assertTrue(storage1 instanceof JdbcPartitionStatisticStorage);
+      assertTrue(storage2 instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testJdbcUrlWithParameters() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(
+        "jdbcUrl",
+        
"jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true");
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testJdbcUrlWithIPv6() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://[::1]:3306/test_db");
+    properties.put("jdbcUser", "test_user");
+    properties.put("jdbcPassword", "test_password");
+
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyPassword() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");
+    properties.put("jdbcUser", "sa");
+    properties.put("jdbcPassword", ""); // Empty password (allowed for H2 and 
similar databases)
+    properties.put("jdbcDriver", "org.h2.Driver");
+
+    // Empty passwords should be allowed (e.g., for H2 in-memory databases)
+    PartitionStatisticStorage storage = factory.create(properties);
+    assertNotNull(storage);
+  }
+
+  @Test
+  public void testSpecialCharactersInCredentials() {
+    JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+    properties.put("jdbcUser", "[email protected]");
+    properties.put("jdbcPassword", "p@ssw0rd!#$%"); // Special characters
+
+    try {
+      PartitionStatisticStorage storage = factory.create(properties);
+      assertNotNull(storage);
+      assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+    } catch (Exception e) {
+      if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+        Throwable cause = e.getCause();
+        if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+          throw e;
+        }
+      }
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
new file mode 100644
index 0000000000..b1cac18a98
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.MySQLContainer;
+import org.apache.gravitino.integration.test.container.PGImageName;
+import org.apache.gravitino.integration.test.container.PostgreSQLContainer;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end integration tests for {@link JdbcPartitionStatisticStorage} 
using multiple database
+ * backends via Testcontainers.
+ *
+ * <p>These tests verify the complete flow from API calls through JDBC to real 
database instances.
+ * They cover:
+ *
+ * <ul>
+ *   <li>Full CRUD operations (Create, Read, Update, Delete)
+ *   <li>Partition range queries with different bound types
+ *   <li>Transaction integrity and rollback behavior
+ *   <li>Concurrent access from multiple threads
+ *   <li>JSON serialization/deserialization
+ *   <li>Audit information preservation
+ *   <li>Large dataset handling
+ *   <li>Database-specific SQL syntax (MySQL ON DUPLICATE KEY vs PostgreSQL ON 
CONFLICT)
+ * </ul>
+ */
+@Tag("gravitino-docker-test")
+public class TestJdbcPartitionStatisticStorageIT {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestJdbcPartitionStatisticStorageIT.class);
+
+  /**
+   * Abstract base class containing all test logic. Each database-specific 
test class extends this
+   * and implements the database setup.
+   */
+  @TestInstance(TestInstance.Lifecycle.PER_CLASS)
+  abstract static class BaseJdbcPartitionStatisticStorageTest {
+
+    protected JdbcPartitionStatisticStorage storage;
+    protected EntityStore entityStore;
+
+    protected static final String METALAKE = "test_metalake";
+    protected static final MetadataObject TEST_TABLE =
+        MetadataObjects.of(
+            Lists.newArrayList("catalog", "schema", "table"), 
MetadataObject.Type.TABLE);
+
+    /** Subclasses must implement to set up database-specific storage. */
+    protected abstract void setupStorage() throws Exception;
+
+    /** Subclasses must implement to tear down storage. */
+    protected abstract void teardownStorage() throws Exception;
+
+    @BeforeAll
+    public void baseSetup() throws Exception {
+      // Mock EntityStore to return a test table entity
+      entityStore = mock(EntityStore.class);
+      TableEntity tableEntity = mock(TableEntity.class);
+      when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+      when(tableEntity.id()).thenReturn(100L);
+      FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
+
+      setupStorage();
+    }
+
+    @AfterAll
+    public void baseTeardown() throws Exception {
+      teardownStorage();
+    }
+
+    @Test
+    public void testFullCRUDLifecycle() throws IOException {
+      LOG.info("Testing full CRUD lifecycle");
+
+      // 1. CREATE - Insert statistics for a partition
+      List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+      Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+      stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+      stats.put("custom-sizeBytes", StatisticValues.longValue(5000000L));
+      stats.put("custom-lastModified", 
StatisticValues.stringValue("2025-01-21"));
+
+      updates.add(PartitionStatisticsModification.update("partition_2025_01", 
stats));
+
+      List<MetadataObjectStatisticsUpdate> objectUpdates =
+          Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
updates));
+
+      storage.updateStatistics(METALAKE, objectUpdates);
+      LOG.info("Created statistics for partition_2025_01");
+
+      // 2. READ - Verify statistics exist
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+      assertEquals(1, result.size());
+      assertEquals("partition_2025_01", result.get(0).partitionName());
+      assertEquals(3, result.get(0).statistics().size());
+
+      // Verify values and audit info
+      PersistedStatistic rowCount =
+          result.get(0).statistics().stream()
+              .filter(s -> s.name().equals("custom-rowCount"))
+              .findFirst()
+              .orElseThrow();
+
+      assertEquals(1000L, rowCount.value().value());
+      assertNotNull(rowCount.auditInfo());
+      assertNotNull(rowCount.auditInfo().creator());
+      assertNotNull(rowCount.auditInfo().createTime());
+      LOG.info("Verified statistics were created with audit info");
+
+      // 3. UPDATE - Modify existing statistic
+      stats.clear();
+      stats.put("custom-rowCount", StatisticValues.longValue(2000L));
+      stats.put("custom-sizeBytes", StatisticValues.longValue(5000000L));
+      stats.put("custom-lastModified", 
StatisticValues.stringValue("2025-01-21"));
+
+      updates.clear();
+      updates.add(PartitionStatisticsModification.update("partition_2025_01", 
stats));
+      objectUpdates = 
Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, updates));
+
+      storage.updateStatistics(METALAKE, objectUpdates);
+
+      result = storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+      rowCount =
+          result.get(0).statistics().stream()
+              .filter(s -> s.name().equals("custom-rowCount"))
+              .findFirst()
+              .orElseThrow();
+
+      assertEquals(2000L, rowCount.value().value());
+      LOG.info("Updated statistic value from 1000 to 2000");
+
+      // 4. DELETE - Drop specific statistic
+      List<MetadataObjectStatisticsDrop> drops =
+          Lists.newArrayList(
+              MetadataObjectStatisticsDrop.of(
+                  TEST_TABLE,
+                  Lists.newArrayList(
+                      PartitionStatisticsModification.drop(
+                          "partition_2025_01", 
Lists.newArrayList("custom-rowCount")))));
+
+      int dropped = storage.dropStatistics(METALAKE, drops);
+      assertEquals(1, dropped);
+      LOG.info("Dropped 1 statistic");
+
+      // 5. VERIFY - Statistic is gone
+      result = storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+      assertEquals(2, result.get(0).statistics().size());
+
+      boolean hasRowCount =
+          result.get(0).statistics().stream().anyMatch(s -> 
s.name().equals("custom-rowCount"));
+      assertFalse(hasRowCount);
+      LOG.info("Verified statistic was deleted");
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testPartitionRangeQueries() throws IOException {
+      LOG.info("Testing partition range queries");
+
+      // Insert statistics for multiple partitions
+      insertMultiplePartitions("p1", "p2", "p3", "p4", "p5");
+
+      // Test CLOSED bounds: [p2, p4] should return p2, p3, p4
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(
+              METALAKE,
+              TEST_TABLE,
+              PartitionRange.between(
+                  "p2", PartitionRange.BoundType.CLOSED, "p4", 
PartitionRange.BoundType.CLOSED));
+
+      assertEquals(3, result.size());
+      List<String> partitionNames =
+          
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+      assertTrue(partitionNames.contains("p2"));
+      assertTrue(partitionNames.contains("p3"));
+      assertTrue(partitionNames.contains("p4"));
+      LOG.info("CLOSED bounds [p2, p4] returned 3 partitions: {}", 
partitionNames);
+
+      // Test OPEN bounds: (p2, p4) should return only p3
+      result =
+          storage.listStatistics(
+              METALAKE,
+              TEST_TABLE,
+              PartitionRange.between(
+                  "p2", PartitionRange.BoundType.OPEN, "p4", 
PartitionRange.BoundType.OPEN));
+
+      assertEquals(1, result.size());
+      assertEquals("p3", result.get(0).partitionName());
+      LOG.info("OPEN bounds (p2, p4) returned 1 partition: p3");
+
+      // Test upTo: <= p3 should return p1, p2, p3
+      result =
+          storage.listStatistics(
+              METALAKE, TEST_TABLE, PartitionRange.upTo("p3", 
PartitionRange.BoundType.CLOSED));
+
+      assertEquals(3, result.size());
+      partitionNames = 
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+      assertTrue(partitionNames.contains("p1"));
+      assertTrue(partitionNames.contains("p2"));
+      assertTrue(partitionNames.contains("p3"));
+      LOG.info("upTo p3 (CLOSED) returned 3 partitions");
+
+      // Test downTo: >= p3 should return p3, p4, p5
+      result =
+          storage.listStatistics(
+              METALAKE, TEST_TABLE, PartitionRange.downTo("p3", 
PartitionRange.BoundType.CLOSED));
+
+      assertEquals(3, result.size());
+      partitionNames = 
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+      assertTrue(partitionNames.contains("p3"));
+      assertTrue(partitionNames.contains("p4"));
+      assertTrue(partitionNames.contains("p5"));
+      LOG.info("downTo p3 (CLOSED) returned 3 partitions");
+
+      // Test OPEN lower bound: (p2, p5] should return p3, p4, p5
+      result =
+          storage.listStatistics(
+              METALAKE,
+              TEST_TABLE,
+              PartitionRange.between(
+                  "p2", PartitionRange.BoundType.OPEN, "p5", 
PartitionRange.BoundType.CLOSED));
+
+      assertEquals(3, result.size());
+      LOG.info("OPEN-CLOSED bounds (p2, p5] returned 3 partitions");
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testListStatisticsByPartitionNames() throws IOException {
+      LOG.info("Testing list statistics by partition names");
+
+      // Insert statistics for multiple partitions
+      insertMultiplePartitions("part_a", "part_b", "part_c", "part_d", 
"part_e");
+
+      // List specific partitions
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(
+              METALAKE, TEST_TABLE, Lists.newArrayList("part_b", "part_d", 
"part_e"));
+
+      assertEquals(3, result.size());
+      List<String> partitionNames =
+          
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+      assertTrue(partitionNames.contains("part_b"));
+      assertTrue(partitionNames.contains("part_d"));
+      assertTrue(partitionNames.contains("part_e"));
+      assertFalse(partitionNames.contains("part_a"));
+      assertFalse(partitionNames.contains("part_c"));
+      LOG.info("Listed 3 specific partitions by name");
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testConcurrentWrites() throws Exception {
+      LOG.info("Testing concurrent writes from multiple threads");
+
+      int threadCount = 10;
+      int partitionsPerThread = 50;
+
+      ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+      CountDownLatch latch = new CountDownLatch(threadCount);
+      AtomicInteger successCount = new AtomicInteger(0);
+      AtomicInteger errorCount = new AtomicInteger(0);
+
+      for (int i = 0; i < threadCount; i++) {
+        final int threadId = i;
+        executor.submit(
+            () -> {
+              try {
+                for (int j = 0; j < partitionsPerThread; j++) {
+                  String partitionName = "thread" + threadId + "_p" + j;
+
+                  Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+                  stats.put("custom-threadId", 
StatisticValues.longValue((long) threadId));
+                  stats.put("custom-partitionIndex", 
StatisticValues.longValue((long) j));
+                  stats.put("custom-value", 
StatisticValues.stringValue("thread" + threadId));
+
+                  List<PartitionStatisticsUpdate> updates =
+                      Lists.newArrayList(
+                          
PartitionStatisticsModification.update(partitionName, stats));
+
+                  List<MetadataObjectStatisticsUpdate> objectUpdates =
+                      
Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, updates));
+
+                  storage.updateStatistics(METALAKE, objectUpdates);
+                }
+                successCount.incrementAndGet();
+              } catch (Exception e) {
+                LOG.error("Thread {} failed", threadId, e);
+                errorCount.incrementAndGet();
+              } finally {
+                latch.countDown();
+              }
+            });
+      }
+
+      boolean completed = latch.await(60, TimeUnit.SECONDS);
+      assertTrue(completed, "All threads should complete within 60 seconds");
+      assertEquals(threadCount, successCount.get(), "All threads should 
succeed");
+      assertEquals(0, errorCount.get(), "No threads should have errors");
+
+      // Verify all statistics were written
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+      assertEquals(
+          threadCount * partitionsPerThread,
+          result.size(),
+          "Should have statistics for all partitions");
+
+      LOG.info(
+          "Successfully completed {} concurrent writes across {} threads",
+          result.size(),
+          threadCount);
+
+      executor.shutdown();
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testMultipleStatisticTypes() throws IOException {
+      LOG.info("Testing multiple statistic value types");
+
+      Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+      stats.put("custom-stringValue", StatisticValues.stringValue("test 
string"));
+      stats.put("custom-longValue", StatisticValues.longValue(12345L));
+      stats.put("custom-doubleValue", StatisticValues.doubleValue(123.45));
+      stats.put("custom-booleanValue", StatisticValues.booleanValue(true));
+
+      List<PartitionStatisticsUpdate> updates =
+          Lists.newArrayList(
+              PartitionStatisticsModification.update("mixed_types_partition", 
stats));
+
+      List<MetadataObjectStatisticsUpdate> objectUpdates =
+          Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
updates));
+
+      storage.updateStatistics(METALAKE, objectUpdates);
+
+      // Read back and verify types
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+      assertEquals(1, result.size());
+      List<PersistedStatistic> statistics = result.get(0).statistics();
+      assertEquals(4, statistics.size());
+
+      // Verify each type
+      PersistedStatistic stringVal =
+          statistics.stream()
+              .filter(s -> s.name().equals("custom-stringValue"))
+              .findFirst()
+              .orElseThrow();
+      assertEquals("test string", stringVal.value().value());
+
+      PersistedStatistic longVal =
+          statistics.stream()
+              .filter(s -> s.name().equals("custom-longValue"))
+              .findFirst()
+              .orElseThrow();
+      assertEquals(12345L, longVal.value().value());
+
+      PersistedStatistic doubleVal =
+          statistics.stream()
+              .filter(s -> s.name().equals("custom-doubleValue"))
+              .findFirst()
+              .orElseThrow();
+      assertEquals(123.45, doubleVal.value().value());
+
+      PersistedStatistic boolVal =
+          statistics.stream()
+              .filter(s -> s.name().equals("custom-booleanValue"))
+              .findFirst()
+              .orElseThrow();
+      assertEquals(true, boolVal.value().value());
+
+      LOG.info("All statistic value types serialized and deserialized 
correctly");
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testBatchDropMultipleStatistics() throws IOException {
+      LOG.info("Testing batch drop of multiple statistics");
+
+      // Insert statistics for multiple partitions
+      insertMultiplePartitions("drop_p1", "drop_p2", "drop_p3");
+
+      // Drop multiple statistics from different partitions
+      List<PartitionStatisticsDrop> dropsList = new ArrayList<>();
+      dropsList.add(
+          PartitionStatisticsModification.drop("drop_p1", 
Lists.newArrayList("custom-stat1")));
+      dropsList.add(
+          PartitionStatisticsModification.drop(
+              "drop_p2", Lists.newArrayList("custom-stat1", "custom-stat2")));
+
+      List<MetadataObjectStatisticsDrop> drops =
+          Lists.newArrayList(MetadataObjectStatisticsDrop.of(TEST_TABLE, 
dropsList));
+
+      int droppedCount = storage.dropStatistics(METALAKE, drops);
+      assertEquals(3, droppedCount, "Should drop 3 statistics total (1 from 
p1, 2 from p2)");
+
+      LOG.info("Dropped {} statistics in batch", droppedCount);
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    @Test
+    public void testLargeDataset() throws IOException {
+      LOG.info("Testing large dataset handling");
+
+      int partitionCount = 1000;
+      int statisticsPerPartition = 10;
+
+      // Insert large dataset
+      List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+      for (int i = 0; i < partitionCount; i++) {
+        String partitionName = "large_p" + String.format("%04d", i);
+        Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+
+        for (int j = 0; j < statisticsPerPartition; j++) {
+          stats.put("custom-stat" + j, StatisticValues.longValue((long) (i * 
1000 + j)));
+        }
+
+        updates.add(PartitionStatisticsModification.update(partitionName, 
stats));
+      }
+
+      List<MetadataObjectStatisticsUpdate> objectUpdates =
+          Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
updates));
+
+      long startTime = System.currentTimeMillis();
+      storage.updateStatistics(METALAKE, objectUpdates);
+      long insertTime = System.currentTimeMillis() - startTime;
+
+      LOG.info(
+          "Inserted {} partitions with {} statistics each in {} ms",
+          partitionCount,
+          statisticsPerPartition,
+          insertTime);
+
+      // Query all statistics
+      startTime = System.currentTimeMillis();
+      List<PersistedPartitionStatistics> result =
+          storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+      long queryTime = System.currentTimeMillis() - startTime;
+
+      assertEquals(partitionCount, result.size());
+      assertEquals(statisticsPerPartition, result.get(0).statistics().size());
+
+      LOG.info("Queried {} partitions in {} ms", partitionCount, queryTime);
+
+      // Performance assertions
+      assertTrue(
+          insertTime < 30000,
+          "Insert should complete within 30 seconds, took " + insertTime + 
"ms");
+      assertTrue(
+          queryTime < 5000, "Query should complete within 5 seconds, took " + 
queryTime + "ms");
+
+      // Cleanup
+      cleanupAllStatistics();
+    }
+
+    // ==================== Helper Methods ====================
+
+    /**
+     * Inserts test statistics for multiple partitions.
+     *
+     * @param partitionNames the partition names to create
+     */
+    protected void insertMultiplePartitions(String... partitionNames) throws 
IOException {
+      List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+
+      for (String partitionName : partitionNames) {
+        Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+        stats.put("custom-stat1", StatisticValues.longValue(100L));
+        stats.put("custom-stat2", StatisticValues.stringValue("value2"));
+        stats.put("custom-stat3", StatisticValues.doubleValue(3.14));
+
+        updates.add(PartitionStatisticsModification.update(partitionName, 
stats));
+      }
+
+      List<MetadataObjectStatisticsUpdate> objectUpdates =
+          Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, 
updates));
+
+      storage.updateStatistics(METALAKE, objectUpdates);
+    }
+
+    /** Removes all statistics for the test table to cleanup between tests. */
+    protected void cleanupAllStatistics() throws IOException {
+      // Get all current statistics
+      List<PersistedPartitionStatistics> allStats =
+          storage.listStatistics(METALAKE, TEST_TABLE, 
PartitionRange.ALL_PARTITIONS);
+
+      if (allStats.isEmpty()) {
+        return;
+      }
+
+      // Build drop list for all statistics
+      List<PartitionStatisticsDrop> dropsList = new ArrayList<>();
+      for (PersistedPartitionStatistics partitionStats : allStats) {
+        List<String> statisticNames =
+            
partitionStats.statistics().stream().map(PersistedStatistic::name).toList();
+
+        dropsList.add(
+            
PartitionStatisticsModification.drop(partitionStats.partitionName(), 
statisticNames));
+      }
+
+      List<MetadataObjectStatisticsDrop> drops =
+          Lists.newArrayList(MetadataObjectStatisticsDrop.of(TEST_TABLE, 
dropsList));
+
+      storage.dropStatistics(METALAKE, drops);
+      LOG.debug("Cleaned up all statistics for test table");
+    }
+  }
+
+  /** MySQL-specific tests using Docker container. */
+  @Nested
+  @Tag("gravitino-docker-test")
+  static class MySQLTest extends BaseJdbcPartitionStatisticStorageTest {
+
+    private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+    private static final TestDatabaseName TEST_DB_NAME = 
TestDatabaseName.MYSQL_MYSQL_ABSTRACT_IT;
+    private MySQLContainer mySQLContainer;
+
+    @Override
+    protected void setupStorage() throws Exception {
+      LOG.info("Starting MySQL container for partition statistics integration 
tests");
+
+      // Start MySQL container
+      containerSuite.startMySQLContainer(TEST_DB_NAME);
+      mySQLContainer = containerSuite.getMySQLContainer();
+
+      // Create database schema
+      createMySQLSchema();
+
+      // Create storage factory with MySQL container connection
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("jdbcUrl", mySQLContainer.getJdbcUrl(TEST_DB_NAME));
+      properties.put("jdbcUser", mySQLContainer.getUsername());
+      properties.put("jdbcPassword", mySQLContainer.getPassword());
+      properties.put("jdbcDriver", 
mySQLContainer.getDriverClassName(TEST_DB_NAME));
+
+      JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+      storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+      LOG.info("MySQL partition statistics storage initialized successfully");
+    }
+
+    @Override
+    protected void teardownStorage() throws Exception {
+      if (storage != null) {
+        storage.close();
+        LOG.info("MySQL partition statistics storage closed");
+      }
+    }
+
+    /** Creates the partition_statistic_meta table in the MySQL test database. 
*/
+    private void createMySQLSchema() throws SQLException {
+      String jdbcUrl = mySQLContainer.getJdbcUrl(TEST_DB_NAME);
+      String username = mySQLContainer.getUsername();
+      String password = mySQLContainer.getPassword();
+
+      try (Connection conn = DriverManager.getConnection(jdbcUrl, username, 
password);
+          Statement stmt = conn.createStatement()) {
+
+        String createTableSQL =
+            "CREATE TABLE IF NOT EXISTS `partition_statistic_meta` ("
+                + "  `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id 
from table_meta',"
+                + "  `partition_name` VARCHAR(1024) NOT NULL COMMENT 
'partition name',"
+                + "  `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic 
name',"
+                + "  `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic 
value as JSON',"
+                + "  `audit_info` TEXT NOT NULL COMMENT 'audit information as 
JSON',"
+                + "  `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 
'creation timestamp in milliseconds',"
+                + "  `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last 
update timestamp in milliseconds',"
+                + "  PRIMARY KEY (`table_id`, `partition_name`(255), 
`statistic_name`),"
+                + "  KEY `idx_table_partition` (`table_id`, 
`partition_name`(255))"
+                + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
+                + "  COMMENT 'partition statistics metadata'";
+
+        stmt.execute(createTableSQL);
+        LOG.info("Created partition_statistic_meta table in MySQL");
+      }
+    }
+  }
+
+  /** PostgreSQL-specific tests using Docker container. */
+  @Nested
+  @Tag("gravitino-docker-test")
+  static class PostgreSQLTest extends BaseJdbcPartitionStatisticStorageTest {
+
+    private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+    private static final TestDatabaseName TEST_DB_NAME = 
TestDatabaseName.PG_TEST_PARTITION_STATS;
+    private PostgreSQLContainer postgreSQLContainer;
+
+    @Override
+    protected void setupStorage() throws Exception {
+      LOG.info("Starting PostgreSQL container for partition statistics 
integration tests");
+
+      // Start PostgreSQL container
+      containerSuite.startPostgreSQLContainer(TEST_DB_NAME, 
PGImageName.VERSION_13);
+      postgreSQLContainer = 
containerSuite.getPostgreSQLContainer(PGImageName.VERSION_13);
+
+      // Create database schema
+      createPostgreSQLSchema();
+
+      // Create storage factory with PostgreSQL container connection
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("jdbcUrl", postgreSQLContainer.getJdbcUrl(TEST_DB_NAME));
+      properties.put("jdbcUser", postgreSQLContainer.getUsername());
+      properties.put("jdbcPassword", postgreSQLContainer.getPassword());
+      properties.put("jdbcDriver", 
postgreSQLContainer.getDriverClassName(TEST_DB_NAME));
+
+      JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+      storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+      LOG.info("PostgreSQL partition statistics storage initialized 
successfully");
+    }
+
+    @Override
+    protected void teardownStorage() throws Exception {
+      if (storage != null) {
+        storage.close();
+        LOG.info("PostgreSQL partition statistics storage closed");
+      }
+    }
+
+    /** Creates the partition_statistic_meta table in the PostgreSQL test 
database. */
+    private void createPostgreSQLSchema() throws SQLException {
+      String jdbcUrl = postgreSQLContainer.getJdbcUrl(TEST_DB_NAME);
+      String username = postgreSQLContainer.getUsername();
+      String password = postgreSQLContainer.getPassword();
+
+      try (Connection conn = DriverManager.getConnection(jdbcUrl, username, 
password);
+          Statement stmt = conn.createStatement()) {
+
+        String createTableSQL =
+            "CREATE TABLE IF NOT EXISTS partition_statistic_meta ("
+                + "  table_id BIGINT NOT NULL,"
+                + "  partition_name VARCHAR(1024) NOT NULL,"
+                + "  statistic_name VARCHAR(128) NOT NULL,"
+                + "  statistic_value TEXT NOT NULL,"
+                + "  audit_info TEXT NOT NULL,"
+                + "  created_at BIGINT NOT NULL,"
+                + "  updated_at BIGINT NOT NULL,"
+                + "  PRIMARY KEY (table_id, partition_name, statistic_name)"
+                + ")";
+
+        stmt.execute(createTableSQL);
+
+        String createIndexSQL =
+            "CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name)";
+        stmt.execute(createIndexSQL);
+
+        LOG.info("Created partition_statistic_meta table in PostgreSQL");
+      }
+    }
+  }
+
+  /** H2-specific tests using embedded in-memory database. */
+  @Nested
+  static class H2Test extends BaseJdbcPartitionStatisticStorageTest {
+
+    private static final String H2_JDBC_URL =
+        "jdbc:h2:mem:test_partition_stats;MODE=MySQL;DB_CLOSE_DELAY=-1";
+    private static final String H2_USERNAME = "sa";
+    private static final String H2_PASSWORD = "";
+
+    @Override
+    protected void setupStorage() throws Exception {
+      LOG.info("Setting up H2 in-memory database for partition statistics 
integration tests");
+
+      // Create database schema
+      createH2Schema();
+
+      // Create storage factory with H2 connection
+      Map<String, String> properties = Maps.newHashMap();
+      properties.put("jdbcUrl", H2_JDBC_URL);
+      properties.put("jdbcUser", H2_USERNAME);
+      properties.put("jdbcPassword", H2_PASSWORD);
+      properties.put("jdbcDriver", "org.h2.Driver");
+
+      JdbcPartitionStatisticStorageFactory factory = new 
JdbcPartitionStatisticStorageFactory();
+      storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+      LOG.info("H2 partition statistics storage initialized successfully");
+    }
+
+    @Override
+    protected void teardownStorage() throws Exception {
+      if (storage != null) {
+        storage.close();
+        LOG.info("H2 partition statistics storage closed");
+      }
+
+      // Drop the H2 database
+      try (Connection conn = DriverManager.getConnection(H2_JDBC_URL, 
H2_USERNAME, H2_PASSWORD);
+          Statement stmt = conn.createStatement()) {
+        stmt.execute("DROP TABLE IF EXISTS partition_statistic_meta");
+        LOG.info("Dropped H2 database");
+      }
+    }
+
+    /** Creates the partition_statistic_meta table in the H2 test database. */
+    private void createH2Schema() throws SQLException {
+      try (Connection conn = DriverManager.getConnection(H2_JDBC_URL, 
H2_USERNAME, H2_PASSWORD);
+          Statement stmt = conn.createStatement()) {
+
+        String createTableSQL =
+            "CREATE TABLE IF NOT EXISTS partition_statistic_meta ("
+                + "  table_id BIGINT NOT NULL,"
+                + "  partition_name VARCHAR(1024) NOT NULL,"
+                + "  statistic_name VARCHAR(128) NOT NULL,"
+                + "  statistic_value CLOB NOT NULL,"
+                + "  audit_info CLOB NOT NULL,"
+                + "  created_at BIGINT NOT NULL,"
+                + "  updated_at BIGINT NOT NULL,"
+                + "  PRIMARY KEY (table_id, partition_name, statistic_name)"
+                + ")";
+
+        stmt.execute(createTableSQL);
+
+        String createIndexSQL =
+            "CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name)";
+        stmt.execute(createIndexSQL);
+
+        LOG.info("Created partition_statistic_meta table in H2");
+      }
+    }
+  }
+}
diff --git a/docs/manage-statistics-in-gravitino.md 
b/docs/manage-statistics-in-gravitino.md
index 139790fd39..bb064fdd19 100644
--- a/docs/manage-statistics-in-gravitino.md
+++ b/docs/manage-statistics-in-gravitino.md
@@ -234,11 +234,73 @@ table.dropPartitionStatistics(statisticsToDrop);
 
 ### Server configuration
 
-| Configuration item                              | Description                
                                                                                
                                                                                
                                          | Default value                       
                                       | Required  | Since version |
-|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------|-----------|---------------|
-| `gravitino.stats.partition.storageFactoryClass` | The storage factory class 
for partition statistics, which is used to store partition statistics in the 
different storage. The 
`org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory`  can 
only be used for testing. | 
`org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory` |  
No       | 1.0.0         |
+| Configuration item                              | Description                
                                                                                
                                                                                
                                          | Default value                       
                                     | Required  | Since version |
+|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|-----------|---------------|
+| `gravitino.stats.partition.storageFactoryClass` | The storage factory class 
for partition statistics, which is used to store partition statistics in the 
different storage. The 
`org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory`  can 
only be used for testing. | 
`org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory` |  No 
      | 1.0.0         |
 
 
+#### JDBC Storage (Default)
+
+Starting from version 1.2.0, Gravitino uses JDBC-based storage as the default 
partition statistics storage backend.
+This provides a reliable, production-ready solution that supports multiple 
database backends:
+
+- **MySQL** (recommended for production)
+- **PostgreSQL**
+- **H2** (suitable for testing and development)
+
+To use JDBC storage, configure the following options by adding the prefix 
`gravitino.stats.partition.storageOption.`:
+
+| Configuration item                                              | 
Description                                                        | Default 
value              | Required | Since version |
+|-----------------------------------------------------------------|--------------------------------------------------------------------|----------------------------|----------|---------------|
+| `gravitino.stats.partition.storageOption.jdbcUrl`               | JDBC 
connection URL (e.g., jdbc:mysql://localhost:3306/gravitino) | None             
          | Yes      | 1.2.0         |
+| `gravitino.stats.partition.storageOption.jdbcUser`              | Database 
username                                                  | None                
       | Yes      | 1.2.0         |
+| `gravitino.stats.partition.storageOption.jdbcPassword`          | Database 
password                                                  | None                
       | Yes      | 1.2.0         |
+| `gravitino.stats.partition.storageOption.jdbcDriver`            | JDBC 
driver class name                                             | 
`com.mysql.cj.jdbc.Driver` | No       | 1.2.0         |
+| `gravitino.stats.partition.storageOption.poolMaxSize`           | Maximum 
connection pool size                                       | `10`               
        | No       | 1.2.0         |
+| `gravitino.stats.partition.storageOption.poolMinIdle`           | Minimum 
idle connections in pool                                   | `2`                
        | No       | 1.2.0         |
+| `gravitino.stats.partition.storageOption.connectionTimeoutMs`   | Connection 
timeout in milliseconds                                 | `30000`               
     | No       | 1.2.0         |
+| `gravitino.stats.partition.storageOption.testOnBorrow`          | Test 
connections before use                                        | `true`          
           | No       | 1.2.0         |
+
+**Example MySQL Configuration:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass = 
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.jdbcUrl = 
jdbc:mysql://localhost:3306/gravitino
+gravitino.stats.partition.storageOption.jdbcUser = gravitino
+gravitino.stats.partition.storageOption.jdbcPassword = gravitino123
+gravitino.stats.partition.storageOption.poolMaxSize = 20
+```
+
+**Example PostgreSQL Configuration:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass = 
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.jdbcUrl = 
jdbc:postgresql://localhost:5432/gravitino
+gravitino.stats.partition.storageOption.jdbcUser = gravitino
+gravitino.stats.partition.storageOption.jdbcPassword = gravitino123
+gravitino.stats.partition.storageOption.jdbcDriver = org.postgresql.Driver
+```
+
+**Database Schema Setup:**
+
+Before using JDBC storage, you need to create the database schema. Schema 
files are provided for all supported databases:
+
+- MySQL: `scripts/mysql/schema-${GRAVITINO_VERSION}-mysql.sql`
+- PostgreSQL: `scripts/postgresql/schema-${GRAVITINO_VERSION}-postgresql.sql`
+- H2: `scripts/h2/schema-${GRAVITINO_VERSION}-h2.sql`
+
+For MySQL:
+```bash
+mysql -u root -p < scripts/mysql/schema-${GRAVITINO_VERSION}-mysql.sql
+```
+
+For PostgreSQL:
+```bash
+psql -U postgres -d gravitino -f 
scripts/postgresql/schema-${GRAVITINO_VERSION}-postgresql.sql
+```
+
+#### Lance Storage (Alternative)
+
 If you use [Lance](https://lancedb.github.io/lance/) as the partition 
statistics storage, you can set the options below, if you have other lance 
storage options, you can pass it by adding prefix 
`gravitino.stats.partition.storageOption.`.
 For example, if you set an extra property `foo` to `bar` for Lance storage 
option, you can add a configuration item 
`gravitino.stats.partition.storageOption.foo` with value `bar`.
 
@@ -258,7 +320,14 @@ For Lance remote storage, you can refer to the document 
[here](https://lancedb.g
 
 If you have many tables with a small number of partitions, you should set a 
smaller metadataFileCacheSizeBytes and indexCacheSizeBytes.
 
-### Implementation a custom partition storage
+**To use Lance storage, configure:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass = 
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.location = /data/lance
+```
+
+### Implement a custom partition storage
 
 You can implement a custom partition storage by implementing the interface 
`org.apache.gravitino.stats.storage.PartitionStatisticStorageFactory` and
 setting the configuration item `gravitino.stats.partition.storageFactoryClass` 
to your class name.
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
index 806696c6a2..8f0de42219 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
@@ -115,4 +115,13 @@ public enum TestDatabaseName {
     }
   },
   PG_ICEBERG_AUTHZ_IT,
+
+  /** Represents the PostgreSQL database for partition statistics integration 
tests. */
+  PG_TEST_PARTITION_STATS {
+    /** PostgreSQL only accept lowercase database name */
+    @Override
+    public String toString() {
+      return this.name().toLowerCase();
+    }
+  },
 }
diff --git a/scripts/h2/schema-1.2.0-h2.sql b/scripts/h2/schema-1.2.0-h2.sql
index 92fa597fe3..4065ca170d 100644
--- a/scripts/h2/schema-1.2.0-h2.sql
+++ b/scripts/h2/schema-1.2.0-h2.sql
@@ -496,3 +496,18 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
     KEY `idx_vecid` (`catalog_id`)
 ) ENGINE=InnoDB;
 
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for 
tables
+
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+    table_id BIGINT NOT NULL COMMENT 'table id from table_meta',
+    partition_name VARCHAR(1024) NOT NULL COMMENT 'partition name',
+    statistic_name VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    statistic_value CLOB NOT NULL COMMENT 'statistic value as JSON',
+    audit_info CLOB NOT NULL COMMENT 'audit information as JSON',
+    created_at BIGINT NOT NULL COMMENT 'creation timestamp in milliseconds',
+    updated_at BIGINT NOT NULL COMMENT 'last update timestamp in milliseconds',
+    PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name);
diff --git a/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql 
b/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
index e6614b596d..b011cf17e0 100644
--- a/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
+++ b/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
@@ -69,3 +69,16 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
     KEY `idx_vecid` (`catalog_id`)
 ) ENGINE=InnoDB;
 
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+    table_id BIGINT NOT NULL COMMENT 'table id from table_meta',
+    partition_name VARCHAR(1024) NOT NULL COMMENT 'partition name',
+    statistic_name VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    statistic_value CLOB NOT NULL COMMENT 'statistic value as JSON',
+    audit_info CLOB NOT NULL COMMENT 'audit information as JSON',
+    created_at BIGINT NOT NULL COMMENT 'creation timestamp in milliseconds',
+    updated_at BIGINT NOT NULL COMMENT 'last update timestamp in milliseconds',
+    PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name);
diff --git a/scripts/mysql/schema-1.2.0-mysql.sql 
b/scripts/mysql/schema-1.2.0-mysql.sql
index ddb4372a27..d795558a1f 100644
--- a/scripts/mysql/schema-1.2.0-mysql.sql
+++ b/scripts/mysql/schema-1.2.0-mysql.sql
@@ -487,3 +487,17 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
     KEY `idx_vecid` (`catalog_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'view 
metadata';
 
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for 
tables
+
+CREATE TABLE IF NOT EXISTS `partition_statistic_meta` (
+    `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id from table_meta',
+    `partition_name` VARCHAR(1024) NOT NULL COMMENT 'partition name',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value as JSON',
+    `audit_info` TEXT NOT NULL COMMENT 'audit information as JSON',
+    `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'creation timestamp in 
milliseconds',
+    `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last update timestamp 
in milliseconds',
+    PRIMARY KEY (`table_id`, `partition_name`(255), `statistic_name`),
+    KEY `idx_table_partition` (`table_id`, `partition_name`(255))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'partition 
statistics metadata';
diff --git a/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql 
b/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
index 7140320410..120af6bfa2 100644
--- a/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
@@ -69,3 +69,15 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
     KEY `idx_vecid` (`catalog_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'view 
metadata';
 
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS `partition_statistic_meta` (
+    `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id from table_meta',
+    `partition_name` VARCHAR(1024) NOT NULL COMMENT 'partition name',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value as JSON',
+    `audit_info` TEXT NOT NULL COMMENT 'audit information as JSON',
+    `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'creation timestamp in 
milliseconds',
+    `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last update timestamp 
in milliseconds',
+    PRIMARY KEY (`table_id`, `partition_name`(255), `statistic_name`),
+    KEY `idx_table_partition` (`table_id`, `partition_name`(255))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'partition 
statistics metadata';
diff --git a/scripts/postgresql/schema-1.2.0-postgresql.sql 
b/scripts/postgresql/schema-1.2.0-postgresql.sql
index 3b4c188986..268b1f1c8c 100644
--- a/scripts/postgresql/schema-1.2.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.2.0-postgresql.sql
@@ -862,3 +862,27 @@ COMMENT ON COLUMN view_meta.current_version IS 'view 
current version';
 COMMENT ON COLUMN view_meta.last_version IS 'view last version';
 COMMENT ON COLUMN view_meta.deleted_at IS 'view deleted at';
 
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for 
tables
+
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+    table_id BIGINT NOT NULL,
+    partition_name VARCHAR(1024) NOT NULL,
+    statistic_name VARCHAR(128) NOT NULL,
+    statistic_value TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    created_at BIGINT NOT NULL,
+    updated_at BIGINT NOT NULL,
+    PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name);
+
+COMMENT ON TABLE partition_statistic_meta IS 'partition statistics metadata';
+COMMENT ON COLUMN partition_statistic_meta.table_id IS 'table id from 
table_meta';
+COMMENT ON COLUMN partition_statistic_meta.partition_name IS 'partition name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_value IS 'statistic value 
as JSON';
+COMMENT ON COLUMN partition_statistic_meta.audit_info IS 'audit information as 
JSON';
+COMMENT ON COLUMN partition_statistic_meta.created_at IS 'creation timestamp 
in milliseconds';
+COMMENT ON COLUMN partition_statistic_meta.updated_at IS 'last update 
timestamp in milliseconds';
diff --git a/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql 
b/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
index b83c0c051e..b04c0dcaa7 100644
--- a/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
@@ -105,3 +105,25 @@ COMMENT ON COLUMN view_meta.current_version IS 'view 
current version';
 COMMENT ON COLUMN view_meta.last_version IS 'view last version';
 COMMENT ON COLUMN view_meta.deleted_at IS 'view deleted at';
 
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+    table_id BIGINT NOT NULL,
+    partition_name VARCHAR(1024) NOT NULL,
+    statistic_name VARCHAR(128) NOT NULL,
+    statistic_value TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    created_at BIGINT NOT NULL,
+    updated_at BIGINT NOT NULL,
+    PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON 
partition_statistic_meta(table_id, partition_name);
+
+COMMENT ON TABLE partition_statistic_meta IS 'partition statistics metadata';
+COMMENT ON COLUMN partition_statistic_meta.table_id IS 'table id from 
table_meta';
+COMMENT ON COLUMN partition_statistic_meta.partition_name IS 'partition name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_value IS 'statistic value 
as JSON';
+COMMENT ON COLUMN partition_statistic_meta.audit_info IS 'audit information as 
JSON';
+COMMENT ON COLUMN partition_statistic_meta.created_at IS 'creation timestamp 
in milliseconds';
+COMMENT ON COLUMN partition_statistic_meta.updated_at IS 'last update 
timestamp in milliseconds';

Reply via email to