This is an automated email from the ASF dual-hosted git repository.
junday pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 5c992dd736 [#9838] feat(storage): Add JDBC storage backend for
partition statistics (#9779)
5c992dd736 is described below
commit 5c992dd736e2ca00163c89b58d0fea332c71133e
Author: cc5566 <[email protected]>
AuthorDate: Tue Feb 3 13:22:55 2026 -0800
[#9838] feat(storage): Add JDBC storage backend for partition statistics
(#9779)
## What changes were proposed in this pull request?
This PR adds JDBC (MySQL, PostgreSQL, H2) as an alternative storage
backend for partition statistics, complementing the existing Lance
storage implementation.
## Why are the changes needed?
Fix #9838
Currently, partition statistics can only be stored using Lance format.
This PR adds JDBC as an option for users who:
- Prefer relational database storage
- Already have database infrastructure (MySQL, PostgreSQL, H2)
- Need transactional guarantees for statistics updates
- Want to use existing database backup/recovery tools
- Want to use the same database as Gravitino's entity storage
## Does this PR introduce any user-facing change?
No breaking changes. The storage backend is pluggable via configuration:
```properties
# Use JDBC storage (default is Lance)
gravitino.stats.partition.storageFactoryClass =
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
# JDBC connection settings
gravitino.stats.partition.storageOption.jdbcUrl =
jdbc:mysql://localhost:3306/gravitino
gravitino.stats.partition.storageOption.jdbcUser = root
gravitino.stats.partition.storageOption.jdbcPassword = gravitino
gravitino.stats.partition.storageOption.jdbcDriver =
com.mysql.cj.jdbc.Driver
```
---------
Co-authored-by: Claude <[email protected]>
---
.../main/java/org/apache/gravitino/Configs.java | 9 +-
.../apache/gravitino/stats/StatisticManager.java | 53 +-
.../storage/JdbcPartitionStatisticStorage.java | 490 +++++++++++++
.../JdbcPartitionStatisticStorageFactory.java | 204 ++++++
.../storage/TestJdbcPartitionStatisticStorage.java | 347 +++++++++
.../TestJdbcPartitionStatisticStorageFactory.java | 291 ++++++++
.../TestJdbcPartitionStatisticStorageIT.java | 800 +++++++++++++++++++++
docs/manage-statistics-in-gravitino.md | 77 +-
.../integration/test/util/TestDatabaseName.java | 9 +
scripts/h2/schema-1.2.0-h2.sql | 15 +
scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql | 13 +
scripts/mysql/schema-1.2.0-mysql.sql | 14 +
scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql | 12 +
scripts/postgresql/schema-1.2.0-postgresql.sql | 24 +
.../upgrade-1.1.0-to-1.2.0-postgresql.sql | 22 +
15 files changed, 2372 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index fc8d85312d..3cc9d703e6 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -28,7 +28,7 @@ import org.apache.gravitino.audit.v2.SimpleFormatterV2;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
-import
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory;
+import org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory;
public class Configs {
@@ -502,8 +502,11 @@ public class Configs {
public static final ConfigEntry<String>
PARTITION_STATS_STORAGE_FACTORY_CLASS =
new ConfigBuilder("gravitino.stats.partition.storageFactoryClass")
- .doc("The partition stats storage factory class.")
+ .doc(
+ "The partition stats storage factory class. "
+ + "Default is JDBC-based storage using the same database as
entity storage. "
+ + "Set to LancePartitionStatisticStorageFactory for
Lance-based storage.")
.version(ConfigConstants.VERSION_1_0_0)
.stringConf()
-
.createWithDefault(LancePartitionStatisticStorageFactory.class.getCanonicalName());
+
.createWithDefault(JdbcPartitionStatisticStorageFactory.class.getCanonicalName());
}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
index fc10c1a5f0..18369a748c 100644
--- a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -69,7 +70,7 @@ public class StatisticManager implements Closeable,
StatisticDispatcher {
this.store = store;
this.idGenerator = idGenerator;
String className =
config.get(Configs.PARTITION_STATS_STORAGE_FACTORY_CLASS);
- Map<String, String> options = config.getConfigsWithPrefix(OPTIONS_PREFIX);
+ Map<String, String> options = buildStorageOptions(config);
try {
PartitionStatisticStorageFactory factory =
(PartitionStatisticStorageFactory)
@@ -85,6 +86,56 @@ public class StatisticManager implements Closeable,
StatisticDispatcher {
}
}
+ /**
+ * Builds storage options map by merging entity store JDBC configs (as
defaults) with
+ * partition-specific configs (which override defaults).
+ *
+ * @param config the configuration
+ * @return merged options map
+ */
+ private Map<String, String> buildStorageOptions(Config config) {
+ Map<String, String> options = new HashMap<>();
+
+ // First, add entity store JDBC configs as defaults
+ // These will be used if partition-specific configs are not provided
+ String entityJdbcUrl =
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL);
+ if (entityJdbcUrl != null && !entityJdbcUrl.isEmpty()) {
+ options.put("jdbcUrl", entityJdbcUrl);
+ }
+
+ String entityJdbcDriver =
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER);
+ if (entityJdbcDriver != null && !entityJdbcDriver.isEmpty()) {
+ options.put("jdbcDriver", entityJdbcDriver);
+ }
+
+ String entityJdbcUser =
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER);
+ if (entityJdbcUser != null && !entityJdbcUser.isEmpty()) {
+ options.put("jdbcUser", entityJdbcUser);
+ }
+
+ String entityJdbcPassword =
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD);
+ if (entityJdbcPassword != null && !entityJdbcPassword.isEmpty()) {
+ options.put("jdbcPassword", entityJdbcPassword);
+ }
+
+ Integer entityMaxConnections =
+ config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS);
+ if (entityMaxConnections != null) {
+ options.put("poolMaxSize", String.valueOf(entityMaxConnections));
+ }
+
+ Long entityWaitMs =
config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS);
+ if (entityWaitMs != null) {
+ options.put("connectionTimeoutMs", String.valueOf(entityWaitMs));
+ }
+
+ // Then, overlay partition-specific configs (these override entity store
configs)
+ Map<String, String> partitionOptions =
config.getConfigsWithPrefix(OPTIONS_PREFIX);
+ options.putAll(partitionOptions);
+
+ return options;
+ }
+
@Override
public List<Statistic> listStatistics(String metalake, MetadataObject
metadataObject) {
try {
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
new file mode 100644
index 0000000000..a976489302
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorage.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JDBC-based implementation of {@link PartitionStatisticStorage}.
+ *
+ * <p>This implementation stores partition statistics in a JDBC-compatible
database table, using
+ * Apache Commons DBCP2 for connection pooling. It supports multiple database
backends (MySQL,
+ * PostgreSQL, H2). Statistics are stored as JSON-serialized values along with
audit information.
+ */
+public class JdbcPartitionStatisticStorage implements
PartitionStatisticStorage {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcPartitionStatisticStorage.class);
+
+ private final DataSource dataSource;
+ private final EntityStore entityStore;
+ private final DatabaseType databaseType;
+
+ /** Supported database types. */
+ private enum DatabaseType {
+ MYSQL,
+ POSTGRESQL,
+ H2
+ }
+
+ // SQL statements for MySQL and H2 (compatible syntax)
+ private static final String INSERT_OR_UPDATE_SQL_MYSQL =
+ "INSERT INTO partition_statistic_meta "
+ + "(table_id, partition_name, statistic_name, statistic_value,
audit_info, created_at, updated_at) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?) "
+ + "ON DUPLICATE KEY UPDATE "
+ + "statistic_value = VALUES(statistic_value), "
+ + "audit_info = VALUES(audit_info), "
+ + "updated_at = VALUES(updated_at)";
+
+ // SQL statement for PostgreSQL
+ private static final String INSERT_OR_UPDATE_SQL_POSTGRESQL =
+ "INSERT INTO partition_statistic_meta "
+ + "(table_id, partition_name, statistic_name, statistic_value,
audit_info, created_at, updated_at) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?) "
+ + "ON CONFLICT (table_id, partition_name, statistic_name) DO UPDATE
SET "
+ + "statistic_value = EXCLUDED.statistic_value, "
+ + "audit_info = EXCLUDED.audit_info, "
+ + "updated_at = EXCLUDED.updated_at";
+
+ private static final String SELECT_STATISTICS_SQL =
+ "SELECT partition_name, statistic_name, statistic_value, audit_info "
+ + "FROM partition_statistic_meta "
+ + "WHERE table_id = ? ";
+
+ private static final String DELETE_STATISTICS_SQL =
+ "DELETE FROM partition_statistic_meta "
+ + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?";
+
+ /**
+ * Constructs a new JdbcPartitionStatisticStorage.
+ *
+ * @param dataSource the JDBC DataSource for database connections
+ */
+ public JdbcPartitionStatisticStorage(DataSource dataSource) {
+ this.dataSource = dataSource;
+ this.entityStore = GravitinoEnv.getInstance().entityStore();
+ this.databaseType = detectDatabaseType();
+ }
+
+ /**
+ * Detects the database type from the JDBC connection.
+ *
+ * @return the detected database type
+ */
+ private DatabaseType detectDatabaseType() {
+ try (Connection conn = dataSource.getConnection()) {
+ String productName =
conn.getMetaData().getDatabaseProductName().toLowerCase();
+ LOG.info("Detected database product: {}", productName);
+
+ if (productName.contains("mysql")) {
+ return DatabaseType.MYSQL;
+ } else if (productName.contains("postgresql")) {
+ return DatabaseType.POSTGRESQL;
+ } else if (productName.contains("h2")) {
+ return DatabaseType.H2;
+ } else {
+ LOG.warn("Unknown database type: {}, defaulting to MySQL syntax",
productName);
+ return DatabaseType.MYSQL;
+ }
+ } catch (SQLException e) {
+ LOG.error("Failed to detect database type, defaulting to MySQL", e);
+ return DatabaseType.MYSQL;
+ }
+ }
+
+ /**
+ * Gets the appropriate INSERT/UPDATE SQL for the current database type.
+ *
+ * @return the SQL statement
+ */
+ private String getInsertOrUpdateSql() {
+ if (databaseType == DatabaseType.POSTGRESQL) {
+ return INSERT_OR_UPDATE_SQL_POSTGRESQL;
+ } else {
+ // MySQL and H2 use the same syntax
+ return INSERT_OR_UPDATE_SQL_MYSQL;
+ }
+ }
+
+ @Override
+ public List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, PartitionRange
partitionRange)
+ throws IOException {
+ LOG.debug(
+ "Listing statistics for metalake: {}, object: {}, range: {}",
+ metalake,
+ metadataObject.fullName(),
+ partitionRange);
+
+ Long tableId = resolveTableId(metalake, metadataObject);
+ String rangeFilter = buildPartitionRangeFilter(partitionRange);
+
+ String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY
partition_name, statistic_name";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(sql)) {
+
+ stmt.setLong(1, tableId);
+
+ // Set partition range parameters
+ setPartitionRangeParameters(stmt, partitionRange, 2);
+
+ try (ResultSet rs = stmt.executeQuery()) {
+ return parseResultSet(rs);
+ }
+
+ } catch (SQLException e) {
+ throw new IOException("Failed to list statistics for " +
metadataObject.fullName(), e);
+ }
+ }
+
+ @Override
+ public List<PersistedPartitionStatistics> listStatistics(
+ String metalake, MetadataObject metadataObject, List<String>
partitionNames)
+ throws IOException {
+ LOG.debug(
+ "Listing statistics for metalake: {}, object: {}, partitions: {}",
+ metalake,
+ metadataObject.fullName(),
+ partitionNames);
+
+ if (partitionNames.isEmpty()) {
+ return Lists.newArrayList();
+ }
+
+ Long tableId = resolveTableId(metalake, metadataObject);
+
+ // Build IN clause
+ String inClause =
+ partitionNames.stream().map(p -> "?").collect(Collectors.joining(", ",
"(", ")"));
+ String sql =
+ SELECT_STATISTICS_SQL
+ + "AND partition_name IN "
+ + inClause
+ + " ORDER BY partition_name, statistic_name";
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(sql)) {
+
+ stmt.setLong(1, tableId);
+
+ int paramIndex = 2;
+ for (String partitionName : partitionNames) {
+ stmt.setString(paramIndex++, partitionName);
+ }
+
+ try (ResultSet rs = stmt.executeQuery()) {
+ return parseResultSet(rs);
+ }
+
+ } catch (SQLException e) {
+ throw new IOException(
+ "Failed to list statistics for partitions "
+ + partitionNames
+ + " in "
+ + metadataObject.fullName(),
+ e);
+ }
+ }
+
+ @Override
+ public int dropStatistics(String metalake,
List<MetadataObjectStatisticsDrop> statisticsToDrop)
+ throws IOException {
+ LOG.debug(
+ "Dropping statistics for metalake: {}, {} objects", metalake,
statisticsToDrop.size());
+
+ int totalDropped = 0;
+
+ try (Connection conn = dataSource.getConnection()) {
+ conn.setAutoCommit(false);
+
+ try (PreparedStatement stmt =
conn.prepareStatement(DELETE_STATISTICS_SQL)) {
+ for (MetadataObjectStatisticsDrop objectDrop : statisticsToDrop) {
+ Long tableId = resolveTableId(metalake, objectDrop.metadataObject());
+
+ for (PartitionStatisticsDrop drop : objectDrop.drops()) {
+ String partitionName = drop.partitionName();
+ for (String statisticName : drop.statisticNames()) {
+ stmt.setLong(1, tableId);
+ stmt.setString(2, partitionName);
+ stmt.setString(3, statisticName);
+ stmt.addBatch();
+ }
+ }
+ }
+
+ int[] results = stmt.executeBatch();
+ for (int result : results) {
+ // Count successful deletions. Per JDBC spec, executeBatch() returns:
+ // - Positive number: actual update count
+ // - SUCCESS_NO_INFO (-2): operation succeeded but driver doesn't
know row count
+ // - EXECUTE_FAILED (-3): operation failed (we don't count this)
+ if (result > 0 || result == PreparedStatement.SUCCESS_NO_INFO) {
+ totalDropped++;
+ }
+ }
+
+ conn.commit();
+ LOG.debug("Successfully dropped {} statistics", totalDropped);
+
+ } catch (Exception e) {
+ conn.rollback();
+ throw e;
+ } finally {
+ conn.setAutoCommit(true);
+ }
+
+ } catch (SQLException e) {
+ throw new IOException("Failed to drop statistics", e);
+ }
+
+ return totalDropped;
+ }
+
+ @Override
+ public void updateStatistics(
+ String metalake, List<MetadataObjectStatisticsUpdate>
statisticsToUpdate) throws IOException {
+ LOG.debug(
+ "Updating statistics for metalake: {}, {} objects", metalake,
statisticsToUpdate.size());
+
+ try (Connection conn = dataSource.getConnection()) {
+ conn.setAutoCommit(false);
+
+ String sql = getInsertOrUpdateSql();
+ try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+ for (MetadataObjectStatisticsUpdate objectUpdate : statisticsToUpdate)
{
+ Long tableId = resolveTableId(metalake,
objectUpdate.metadataObject());
+
+ for (PartitionStatisticsUpdate update :
objectUpdate.partitionUpdates()) {
+ String partitionName = update.partitionName();
+
+ for (Map.Entry<String, StatisticValue<?>> stat :
update.statistics().entrySet()) {
+ String statisticName = stat.getKey();
+ StatisticValue<?> statisticValue = stat.getValue();
+
+ // Create audit info
+ String currentUser = PrincipalUtils.getCurrentUserName();
+ Instant now = Instant.now();
+ AuditInfo auditInfo =
+ AuditInfo.builder()
+ .withCreator(currentUser)
+ .withCreateTime(now)
+ .withLastModifier(currentUser)
+ .withLastModifiedTime(now)
+ .build();
+
+ // Serialize to JSON
+ String statisticValueJson =
+
JsonUtils.anyFieldMapper().writeValueAsString(statisticValue);
+ String auditInfoJson =
JsonUtils.anyFieldMapper().writeValueAsString(auditInfo);
+
+ long timestamp = now.toEpochMilli();
+
+ stmt.setLong(1, tableId);
+ stmt.setString(2, partitionName);
+ stmt.setString(3, statisticName);
+ stmt.setString(4, statisticValueJson);
+ stmt.setString(5, auditInfoJson);
+ stmt.setLong(6, timestamp);
+ stmt.setLong(7, timestamp);
+
+ stmt.addBatch();
+ }
+ }
+ }
+
+ stmt.executeBatch();
+ conn.commit();
+ LOG.debug("Successfully updated statistics");
+
+ } catch (Exception e) {
+ conn.rollback();
+ throw e;
+ } finally {
+ conn.setAutoCommit(true);
+ }
+
+ } catch (SQLException | JsonProcessingException e) {
+ throw new IOException("Failed to update statistics", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // DataSource lifecycle is managed externally by the factory
+ LOG.debug("Closing JdbcPartitionStatisticStorage");
+ }
+
+ /**
+ * Resolves the table ID for a given metadata object.
+ *
+ * @param metalake the metalake name
+ * @param metadataObject the metadata object
+ * @return the table ID
+ * @throws IOException if unable to resolve the table ID
+ */
+ private Long resolveTableId(String metalake, MetadataObject metadataObject)
throws IOException {
+ NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake,
metadataObject);
+ Entity.EntityType type = MetadataObjectUtil.toEntityType(metadataObject);
+
+ TableEntity tableEntity = entityStore.get(identifier, type,
TableEntity.class);
+ return tableEntity.id();
+ }
+
+ /**
+ * Builds the SQL filter clause for partition range.
+ *
+ * @param range the partition range
+ * @return the SQL filter clause
+ */
+ private String buildPartitionRangeFilter(PartitionRange range) {
+ StringBuilder filter = new StringBuilder();
+
+ range
+ .lowerPartitionName()
+ .ifPresent(
+ name -> {
+ String op =
+ range
+ .lowerBoundType()
+ .map(t -> t == PartitionRange.BoundType.CLOSED ? ">=" :
">")
+ .orElse(">=");
+ filter.append("AND partition_name ").append(op).append(" ? ");
+ });
+
+ range
+ .upperPartitionName()
+ .ifPresent(
+ name -> {
+ String op =
+ range
+ .upperBoundType()
+ .map(t -> t == PartitionRange.BoundType.CLOSED ? "<=" :
"<")
+ .orElse("<=");
+ filter.append("AND partition_name ").append(op).append(" ? ");
+ });
+
+ return filter.toString();
+ }
+
+ /**
+ * Sets partition range parameters in the prepared statement.
+ *
+ * @param stmt the prepared statement
+ * @param range the partition range
+ * @param startIndex the starting parameter index
+ * @throws SQLException if setting parameters fails
+ */
+ private void setPartitionRangeParameters(
+ PreparedStatement stmt, PartitionRange range, int startIndex) throws
SQLException {
+ final int[] paramIndex = {startIndex};
+
+ range
+ .lowerPartitionName()
+ .ifPresent(
+ name -> {
+ try {
+ stmt.setString(paramIndex[0]++, name);
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to set lower partition name
parameter", e);
+ }
+ });
+
+ range
+ .upperPartitionName()
+ .ifPresent(
+ name -> {
+ try {
+ stmt.setString(paramIndex[0]++, name);
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to set upper partition name
parameter", e);
+ }
+ });
+ }
+
+ /**
+ * Parses a ResultSet into a list of PersistedPartitionStatistics.
+ *
+ * @param rs the result set
+ * @return list of persisted partition statistics
+ * @throws SQLException if reading from result set fails
+ */
+ private List<PersistedPartitionStatistics> parseResultSet(ResultSet rs)
throws SQLException {
+ Map<String, List<PersistedStatistic>> partitionMap = new HashMap<>();
+
+ while (rs.next()) {
+ String partitionName = rs.getString("partition_name");
+ String statisticName = rs.getString("statistic_name");
+ String statisticValueJson = rs.getString("statistic_value");
+ String auditInfoJson = rs.getString("audit_info");
+
+ try {
+ StatisticValue<?> statisticValue =
+ JsonUtils.anyFieldMapper().readValue(statisticValueJson,
StatisticValue.class);
+ AuditInfo auditInfo =
JsonUtils.anyFieldMapper().readValue(auditInfoJson, AuditInfo.class);
+
+ PersistedStatistic persistedStatistic =
+ PersistedStatistic.of(statisticName, statisticValue, auditInfo);
+
+ partitionMap.computeIfAbsent(partitionName, k -> new
ArrayList<>()).add(persistedStatistic);
+
+ } catch (JsonProcessingException e) {
+ LOG.error(
+ "Failed to parse statistic JSON for partition: {}, statistic: {}",
+ partitionName,
+ statisticName,
+ e);
+ throw new SQLException("Failed to parse statistic JSON", e);
+ }
+ }
+
+ return partitionMap.entrySet().stream()
+ .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..fc4261c412
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/JdbcPartitionStatisticStorageFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import com.google.common.base.Preconditions;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating {@link JdbcPartitionStatisticStorage} instances.
+ *
+ * <p>This factory creates a JDBC-based partition statistics storage using
Apache Commons DBCP2 for
+ * connection pooling. It supports multiple database backends (MySQL,
PostgreSQL, H2) and configures
+ * the connection pool with appropriate settings for partition statistics
workloads.
+ *
+ * <p>Configuration properties:
+ *
+ * <ul>
+ * <li>jdbcUrl (required): JDBC connection URL (e.g.,
jdbc:mysql://host:port/db,
+ * jdbc:postgresql://host:port/db)
+ * <li>jdbcUser (required): Database username
+ * <li>jdbcPassword (required): Database password
+ * <li>jdbcDriver (optional): JDBC driver class name (defaults to
com.mysql.cj.jdbc.Driver)
+ * <li>poolMaxSize (optional): Maximum connection pool size (default: 10)
+ * <li>poolMinIdle (optional): Minimum idle connections (default: 2)
+ * <li>connectionTimeoutMs (optional): Connection timeout in milliseconds
(default: 30000)
+ * <li>testOnBorrow (optional): Test connections before use (default: true)
+ * </ul>
+ */
+public class JdbcPartitionStatisticStorageFactory implements
PartitionStatisticStorageFactory {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(JdbcPartitionStatisticStorageFactory.class);
+
+ // Configuration keys
+ private static final String JDBC_URL = "jdbcUrl";
+ private static final String JDBC_USER = "jdbcUser";
+ private static final String JDBC_PASSWORD = "jdbcPassword";
+ private static final String JDBC_DRIVER = "jdbcDriver";
+ private static final String POOL_MAX_SIZE = "poolMaxSize";
+ private static final String POOL_MIN_IDLE = "poolMinIdle";
+ private static final String CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+ private static final String TEST_ON_BORROW = "testOnBorrow";
+
+ // Default values
+ private static final String DEFAULT_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+ private static final int DEFAULT_POOL_MAX_SIZE = 10;
+ private static final int DEFAULT_POOL_MIN_IDLE = 2;
+ private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30000L;
+ private static final boolean DEFAULT_TEST_ON_BORROW = true;
+ private static final String VALIDATION_QUERY = "SELECT 1";
+
+ // Keep reference to DataSource for cleanup
+ private BasicDataSource dataSource;
+
+ @Override
+ public PartitionStatisticStorage create(Map<String, String> properties) {
+ LOG.info(
+ "Creating JdbcPartitionStatisticStorage with properties: {}",
+ maskSensitiveProperties(properties));
+
+ validateRequiredProperties(properties);
+
+ try {
+ dataSource = createDataSource(properties);
+ return new JdbcPartitionStatisticStorage(dataSource);
+ } catch (Exception e) {
+ if (dataSource != null) {
+ try {
+ dataSource.close();
+ } catch (SQLException closeException) {
+ LOG.error("Failed to close data source after creation error",
closeException);
+ }
+ }
+ throw new GravitinoRuntimeException(e, "Failed to create
JdbcPartitionStatisticStorage");
+ }
+ }
+
+ /**
+ * Creates and configures a BasicDataSource from the provided properties.
+ *
+ * @param properties configuration properties
+ * @return configured DataSource
+ */
+ private BasicDataSource createDataSource(Map<String, String> properties) {
+ BasicDataSource ds = new BasicDataSource();
+
+ // Required properties
+ String jdbcUrl = properties.get(JDBC_URL);
+ String jdbcUser = properties.get(JDBC_USER);
+ String jdbcPassword = properties.get(JDBC_PASSWORD);
+
+ ds.setUrl(jdbcUrl);
+ ds.setUsername(jdbcUser);
+ ds.setPassword(jdbcPassword);
+
+ // Optional properties with defaults
+ String driverClassName = properties.getOrDefault(JDBC_DRIVER,
DEFAULT_JDBC_DRIVER);
+ ds.setDriverClassName(driverClassName);
+
+ int maxSize =
+ Integer.parseInt(
+ properties.getOrDefault(POOL_MAX_SIZE,
String.valueOf(DEFAULT_POOL_MAX_SIZE)));
+ ds.setMaxTotal(maxSize);
+
+ int minIdle =
+ Integer.parseInt(
+ properties.getOrDefault(POOL_MIN_IDLE,
String.valueOf(DEFAULT_POOL_MIN_IDLE)));
+ ds.setMinIdle(minIdle);
+
+ long timeoutMs =
+ Long.parseLong(
+ properties.getOrDefault(
+ CONNECTION_TIMEOUT_MS,
String.valueOf(DEFAULT_CONNECTION_TIMEOUT_MS)));
+ ds.setMaxWait(Duration.ofMillis(timeoutMs));
+
+ boolean testOnBorrow =
+ Boolean.parseBoolean(
+ properties.getOrDefault(TEST_ON_BORROW,
String.valueOf(DEFAULT_TEST_ON_BORROW)));
+ ds.setTestOnBorrow(testOnBorrow);
+
+ if (testOnBorrow) {
+ ds.setValidationQuery(VALIDATION_QUERY);
+ }
+
+ LOG.info(
+ "Created JDBC DataSource: url={}, driver={}, maxPoolSize={},
minIdle={}, timeout={}ms",
+ jdbcUrl,
+ driverClassName,
+ maxSize,
+ minIdle,
+ timeoutMs);
+
+ return ds;
+ }
+
+ /**
+ * Validates that all required properties are present and non-empty.
+ *
+ * @param properties configuration properties
+ * @throws IllegalArgumentException if required properties are missing or
empty
+ */
+ private void validateRequiredProperties(Map<String, String> properties) {
+ String jdbcUrl = properties.get(JDBC_URL);
+ Preconditions.checkArgument(
+ jdbcUrl != null && !jdbcUrl.trim().isEmpty(), "Property %s must be
non-empty", JDBC_URL);
+
+ String jdbcUser = properties.get(JDBC_USER);
+ Preconditions.checkArgument(
+ jdbcUser != null && !jdbcUser.trim().isEmpty(), "Property %s must be
non-empty", JDBC_USER);
+
+ String jdbcPassword = properties.get(JDBC_PASSWORD);
+ Preconditions.checkArgument(jdbcPassword != null, "Property %s must be
present", JDBC_PASSWORD);
+ }
+
+ /**
+ * Creates a masked copy of properties for logging (hides password).
+ *
+ * @param properties original properties
+ * @return masked properties map
+ */
+ private Map<String, String> maskSensitiveProperties(Map<String, String>
properties) {
+ Map<String, String> masked = new HashMap<>(properties);
+ if (masked.containsKey(JDBC_PASSWORD)) {
+ masked.put(JDBC_PASSWORD, "***");
+ }
+ return masked;
+ }
+
+ /**
+ * Closes the data source if it was created by this factory.
+ *
+ * @throws SQLException if closing fails
+ */
+ public void close() throws SQLException {
+ if (dataSource != null) {
+ LOG.info("Closing JDBC DataSource");
+ dataSource.close();
+ dataSource = null;
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
new file mode 100644
index 0000000000..b5242eb967
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorage.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for {@link JdbcPartitionStatisticStorage} using mocked JDBC
components. */
+public class TestJdbcPartitionStatisticStorage {
+
+ private JdbcPartitionStatisticStorage storage;
+ private DataSource mockDataSource;
+ private Connection mockConnection;
+ private PreparedStatement mockPreparedStatement;
+ private ResultSet mockResultSet;
+ private EntityStore mockEntityStore;
+
+ private static final String METALAKE = "test_metalake";
+ private static final MetadataObject TEST_TABLE =
+ MetadataObjects.of(
+ Lists.newArrayList("catalog", "schema", "table"),
MetadataObject.Type.TABLE);
+ private static final Long TABLE_ID = 100L;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // Mock JDBC components
+ mockDataSource = mock(DataSource.class);
+ mockConnection = mock(Connection.class);
+ mockPreparedStatement = mock(PreparedStatement.class);
+ mockResultSet = mock(ResultSet.class);
+
+ when(mockDataSource.getConnection()).thenReturn(mockConnection);
+
when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement);
+ when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
+
+ // Mock DatabaseMetaData for database type detection
+ DatabaseMetaData mockMetaData = mock(DatabaseMetaData.class);
+ when(mockConnection.getMetaData()).thenReturn(mockMetaData);
+ when(mockMetaData.getDatabaseProductName()).thenReturn("MySQL");
+
+ // Mock EntityStore
+ mockEntityStore = mock(EntityStore.class);
+ TableEntity mockTableEntity = mock(TableEntity.class);
+ when(mockEntityStore.get(any(), any(), any())).thenReturn(mockTableEntity);
+ when(mockTableEntity.id()).thenReturn(TABLE_ID);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore",
mockEntityStore, true);
+
+ // Create storage with mocked DataSource
+ storage = new JdbcPartitionStatisticStorage(mockDataSource);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ /** Helper method to serialize StatisticValue to JSON for mocking ResultSet.
*/
+ private String toJson(StatisticValue<?> value) throws
JsonProcessingException {
+ return JsonUtils.anyFieldMapper().writeValueAsString(value);
+ }
+
+ @Test
+ public void testUpdateStatistics() throws Exception {
+ // Prepare test data
+ Map<String, StatisticValue<?>> stats = new HashMap<>();
+ stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+ stats.put("custom-sizeBytes", StatisticValues.longValue(5242880L));
+
+ PartitionStatisticsUpdate update =
+ PartitionStatisticsModification.update("partition_2024_01", stats);
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(
+ MetadataObjectStatisticsUpdate.of(TEST_TABLE,
Lists.newArrayList(update)));
+
+ // Mock successful execution
+ when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1, 1});
+
+ // Execute
+ storage.updateStatistics(METALAKE, objectUpdates);
+
+ // Verify connection and commit were called
+ verify(mockConnection, times(1)).commit();
+ // Connection.close() is called twice: once during database type detection
in constructor,
+ // and once during updateStatistics
+ verify(mockConnection, times(2)).close();
+ }
+
+ @Test
+ public void testListStatisticsAllPartitions() throws Exception {
+ // Create actual JSON from StatisticValue objects
+ String rowCountJson = toJson(StatisticValues.longValue(1000L));
+ String sizeBytesJson = toJson(StatisticValues.longValue(5242880L));
+
+ // Mock ResultSet to return test data
+ when(mockResultSet.next()).thenReturn(true, true, false);
+ when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+ when(mockResultSet.getString("partition_name"))
+ .thenReturn("partition_2024_01", "partition_2024_01");
+ when(mockResultSet.getString("statistic_name"))
+ .thenReturn("custom-rowCount", "custom-sizeBytes");
+ when(mockResultSet.getString("statistic_value")).thenReturn(rowCountJson,
sizeBytesJson);
+ when(mockResultSet.getString("audit_info"))
+ .thenReturn(
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+ // Execute
+ List<PersistedPartitionStatistics> results =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ // Verify
+ assertNotNull(results);
+ assertEquals(1, results.size());
+ assertEquals("partition_2024_01", results.get(0).partitionName());
+ assertEquals(2, results.get(0).statistics().size());
+
+ // Verify statistics values
+ List<PersistedStatistic> statistics = results.get(0).statistics();
+ boolean hasRowCount = statistics.stream().anyMatch(s ->
s.name().equals("custom-rowCount"));
+ boolean hasSizeBytes = statistics.stream().anyMatch(s ->
s.name().equals("custom-sizeBytes"));
+ assertTrue(hasRowCount);
+ assertTrue(hasSizeBytes);
+ }
+
+ @Test
+ public void testListStatisticsWithClosedRange() throws Exception {
+ // Create actual JSON from StatisticValue object
+ String rowCountJson = toJson(StatisticValues.longValue(2000L));
+
+ // Mock ResultSet
+ when(mockResultSet.next()).thenReturn(true, false);
+ when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+
when(mockResultSet.getString("partition_name")).thenReturn("partition_2024_02");
+
when(mockResultSet.getString("statistic_name")).thenReturn("custom-rowCount");
+ when(mockResultSet.getString("statistic_value")).thenReturn(rowCountJson);
+ when(mockResultSet.getString("audit_info"))
+
.thenReturn("{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+ // Execute with range query
+ storage.listStatistics(
+ METALAKE,
+ TEST_TABLE,
+ PartitionRange.between(
+ "partition_2024_01",
+ PartitionRange.BoundType.CLOSED,
+ "partition_2024_03",
+ PartitionRange.BoundType.CLOSED));
+
+ // Verify query was executed with WHERE clause
+ ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+ verify(mockConnection).prepareStatement(sqlCaptor.capture());
+ String executedSql = sqlCaptor.getValue();
+ assertTrue(executedSql.contains("WHERE"));
+ assertTrue(executedSql.contains("table_id"));
+ }
+
+ @Test
+ public void testDropStatistics() throws Exception {
+ // Prepare drop request
+ List<MetadataObjectStatisticsDrop> drops =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ TEST_TABLE,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ "partition_2024_01",
Lists.newArrayList("custom-rowCount")))));
+
+ // Mock successful batch execution
+ when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1});
+
+ // Execute
+ int dropped = storage.dropStatistics(METALAKE, drops);
+
+ // Verify
+ assertEquals(1, dropped);
+ verify(mockPreparedStatement, times(1)).executeBatch();
+ }
+
+ @Test
+ public void testTransactionRollback() throws Exception {
+ // Prepare test data
+ Map<String, StatisticValue<?>> stats = new HashMap<>();
+ stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+
+ PartitionStatisticsUpdate update =
+ PartitionStatisticsModification.update("partition_2024_01", stats);
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(
+ MetadataObjectStatisticsUpdate.of(TEST_TABLE,
Lists.newArrayList(update)));
+
+ // Mock SQLException during batch execution
+ when(mockPreparedStatement.executeBatch()).thenThrow(new
SQLException("Test error"));
+
+ // Execute and expect exception
+ assertThrows(Exception.class, () -> storage.updateStatistics(METALAKE,
objectUpdates));
+
+ // Verify rollback was called
+ verify(mockConnection, times(1)).rollback();
+ }
+
+ @Test
+ public void testMultipleStatisticTypes() throws Exception {
+ // Create actual StatisticValue objects and serialize them to JSON
+ String longValueJson = toJson(StatisticValues.longValue(1000L));
+ String doubleValueJson = toJson(StatisticValues.doubleValue(256.5));
+ String stringValueJson = toJson(StatisticValues.stringValue("parquet"));
+ String booleanValueJson = toJson(StatisticValues.booleanValue(true));
+
+ // Mock ResultSet with different statistic types
+ when(mockResultSet.next()).thenReturn(true, true, true, true, false);
+ when(mockResultSet.getLong("table_id")).thenReturn(TABLE_ID);
+ when(mockResultSet.getString("partition_name"))
+ .thenReturn(
+ "partition_2024_01", "partition_2024_01", "partition_2024_01",
"partition_2024_01");
+ when(mockResultSet.getString("statistic_name"))
+ .thenReturn("custom-rowCount", "custom-avgSize", "custom-format",
"custom-isCompressed");
+ when(mockResultSet.getString("statistic_value"))
+ .thenReturn(longValueJson, doubleValueJson, stringValueJson,
booleanValueJson);
+ when(mockResultSet.getString("audit_info"))
+ .thenReturn(
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}",
+ "{\"creator\":\"test\",\"createTime\":\"2024-01-01T00:00:00Z\"}");
+
+ // Execute
+ List<PersistedPartitionStatistics> results =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ // Verify all statistic types
+ assertEquals(1, results.size());
+ assertEquals(4, results.get(0).statistics().size());
+
+ List<PersistedStatistic> statistics = results.get(0).statistics();
+ Map<String, Object> valueMap = new HashMap<>();
+ for (PersistedStatistic stat : statistics) {
+ valueMap.put(stat.name(), stat.value().value());
+ }
+
+ assertEquals(1000L, valueMap.get("custom-rowCount"));
+ assertEquals(256.5, valueMap.get("custom-avgSize"));
+ assertEquals("parquet", valueMap.get("custom-format"));
+ assertEquals(true, valueMap.get("custom-isCompressed"));
+ }
+
+ @Test
+ public void testBatchUpdate() throws Exception {
+ // Prepare multiple partition updates
+ Map<String, StatisticValue<?>> stats1 = new HashMap<>();
+ stats1.put("custom-rowCount", StatisticValues.longValue(1000L));
+
+ Map<String, StatisticValue<?>> stats2 = new HashMap<>();
+ stats2.put("custom-rowCount", StatisticValues.longValue(2000L));
+
+ Map<String, StatisticValue<?>> stats3 = new HashMap<>();
+ stats3.put("custom-rowCount", StatisticValues.longValue(3000L));
+
+ List<PartitionStatisticsUpdate> updates =
+ Lists.newArrayList(
+ PartitionStatisticsModification.update("partition_2024_01",
stats1),
+ PartitionStatisticsModification.update("partition_2024_02",
stats2),
+ PartitionStatisticsModification.update("partition_2024_03",
stats3));
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE,
updates));
+
+ // Mock successful batch execution
+ when(mockPreparedStatement.executeBatch()).thenReturn(new int[] {1, 1, 1});
+
+ // Execute
+ storage.updateStatistics(METALAKE, objectUpdates);
+
+ // Verify batch was executed
+ verify(mockPreparedStatement, times(1)).executeBatch();
+ verify(mockConnection, times(1)).commit();
+ }
+
+ @Test
+ public void testEmptyResultSet() throws Exception {
+ // Mock empty ResultSet
+ when(mockResultSet.next()).thenReturn(false);
+
+ // Execute
+ List<PersistedPartitionStatistics> results =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ // Verify empty list returned
+ assertNotNull(results);
+ assertTrue(results.isEmpty());
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
new file mode 100644
index 0000000000..cd2e6f92cb
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageFactory.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link JdbcPartitionStatisticStorageFactory}. */
+public class TestJdbcPartitionStatisticStorageFactory {
+
+ @Test
+ public void testCreateWithValidConfiguration() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+ properties.put("jdbcDriver", "com.mysql.cj.jdbc.Driver");
+
+ // This test only validates that the properties pass validation
+ // Actual storage creation requires GravitinoEnv and is tested in
integration tests
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ // Expected if GravitinoEnv is not initialized
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithDefaultDriver() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+ // jdbcDriver not provided, should use default
+
+ // This test only validates that the properties pass validation
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithMissingJdbcUrl() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+ // jdbcUrl is missing
+
+ Exception exception =
+ assertThrows(IllegalArgumentException.class, () ->
factory.create(properties));
+
+ assertTrue(exception.getMessage().contains("jdbcUrl"));
+ }
+
+ @Test
+ public void testCreateWithMissingJdbcUser() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+ properties.put("jdbcPassword", "test_password");
+ // jdbcUser is missing
+
+ Exception exception =
+ assertThrows(IllegalArgumentException.class, () ->
factory.create(properties));
+
+ assertTrue(exception.getMessage().contains("jdbcUser"));
+ }
+
+ @Test
+ public void testCreateWithMissingJdbcPassword() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+ properties.put("jdbcUser", "test_user");
+ // jdbcPassword is missing
+
+ Exception exception =
+ assertThrows(IllegalArgumentException.class, () ->
factory.create(properties));
+
+ assertTrue(exception.getMessage().contains("jdbcPassword"));
+ }
+
+ @Test
+ public void testCreateWithEmptyProperties() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+
+ Exception exception =
+ assertThrows(IllegalArgumentException.class, () ->
factory.create(properties));
+
+ assertNotNull(exception.getMessage());
+ }
+
+ @Test
+ public void testCreateWithNullProperties() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ assertThrows(NullPointerException.class, () -> factory.create(null));
+ }
+
+ @Test
+ public void testCreateWithAllProperties() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl",
"jdbc:mysql://localhost:3306/test_db?useSSL=false");
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+ properties.put("jdbcDriver", "com.mysql.cj.jdbc.Driver");
+ properties.put("extra-property", "extra-value"); // Extra properties
should be ignored
+
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleFactoryInstances() {
+ // Test that multiple factory instances can be created independently
+ JdbcPartitionStatisticStorageFactory factory1 = new
JdbcPartitionStatisticStorageFactory();
+ JdbcPartitionStatisticStorageFactory factory2 = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties1 = new HashMap<>();
+ properties1.put("jdbcUrl", "jdbc:mysql://localhost:3306/db1");
+ properties1.put("jdbcUser", "user1");
+ properties1.put("jdbcPassword", "pass1");
+
+ Map<String, String> properties2 = new HashMap<>();
+ properties2.put("jdbcUrl", "jdbc:mysql://localhost:3306/db2");
+ properties2.put("jdbcUser", "user2");
+ properties2.put("jdbcPassword", "pass2");
+
+ try {
+ PartitionStatisticStorage storage1 = factory1.create(properties1);
+ PartitionStatisticStorage storage2 = factory2.create(properties2);
+ assertNotNull(storage1);
+ assertNotNull(storage2);
+ assertTrue(storage1 instanceof JdbcPartitionStatisticStorage);
+ assertTrue(storage2 instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testJdbcUrlWithParameters() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ "jdbcUrl",
+
"jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true");
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testJdbcUrlWithIPv6() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://[::1]:3306/test_db");
+ properties.put("jdbcUser", "test_user");
+ properties.put("jdbcPassword", "test_password");
+
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testEmptyPassword() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");
+ properties.put("jdbcUser", "sa");
+ properties.put("jdbcPassword", ""); // Empty password (allowed for H2 and
similar databases)
+ properties.put("jdbcDriver", "org.h2.Driver");
+
+ // Empty passwords should be allowed (e.g., for H2 in-memory databases)
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ }
+
+ @Test
+ public void testSpecialCharactersInCredentials() {
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("jdbcUrl", "jdbc:mysql://localhost:3306/test_db");
+ properties.put("jdbcUser", "[email protected]");
+ properties.put("jdbcPassword", "p@ssw0rd!#$%"); // Special characters
+
+ try {
+ PartitionStatisticStorage storage = factory.create(properties);
+ assertNotNull(storage);
+ assertTrue(storage instanceof JdbcPartitionStatisticStorage);
+ } catch (Exception e) {
+ if (e.getMessage() == null || !e.getMessage().contains("GravitinoEnv")) {
+ Throwable cause = e.getCause();
+ if (cause == null || !cause.getMessage().contains("GravitinoEnv")) {
+ throw e;
+ }
+ }
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
new file mode 100644
index 0000000000..b1cac18a98
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestJdbcPartitionStatisticStorageIT.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats.storage;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.MySQLContainer;
+import org.apache.gravitino.integration.test.container.PGImageName;
+import org.apache.gravitino.integration.test.container.PostgreSQLContainer;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.stats.PartitionRange;
+import org.apache.gravitino.stats.PartitionStatisticsDrop;
+import org.apache.gravitino.stats.PartitionStatisticsModification;
+import org.apache.gravitino.stats.PartitionStatisticsUpdate;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end integration tests for {@link JdbcPartitionStatisticStorage}
using multiple database
+ * backends via Testcontainers.
+ *
+ * <p>These tests verify the complete flow from API calls through JDBC to real
database instances.
+ * They cover:
+ *
+ * <ul>
+ * <li>Full CRUD operations (Create, Read, Update, Delete)
+ * <li>Partition range queries with different bound types
+ * <li>Transaction integrity and rollback behavior
+ * <li>Concurrent access from multiple threads
+ * <li>JSON serialization/deserialization
+ * <li>Audit information preservation
+ * <li>Large dataset handling
+ * <li>Database-specific SQL syntax (MySQL ON DUPLICATE KEY vs PostgreSQL ON
CONFLICT)
+ * </ul>
+ */
+@Tag("gravitino-docker-test")
+public class TestJdbcPartitionStatisticStorageIT {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestJdbcPartitionStatisticStorageIT.class);
+
+ /**
+ * Abstract base class containing all test logic. Each database-specific
test class extends this
+ * and implements the database setup.
+ */
+ @TestInstance(TestInstance.Lifecycle.PER_CLASS)
+ abstract static class BaseJdbcPartitionStatisticStorageTest {
+
+ protected JdbcPartitionStatisticStorage storage;
+ protected EntityStore entityStore;
+
+ protected static final String METALAKE = "test_metalake";
+ protected static final MetadataObject TEST_TABLE =
+ MetadataObjects.of(
+ Lists.newArrayList("catalog", "schema", "table"),
MetadataObject.Type.TABLE);
+
+ /** Subclasses must implement to set up database-specific storage. */
+ protected abstract void setupStorage() throws Exception;
+
+ /** Subclasses must implement to tear down storage. */
+ protected abstract void teardownStorage() throws Exception;
+
+ @BeforeAll
+ public void baseSetup() throws Exception {
+ // Mock EntityStore to return a test table entity
+ entityStore = mock(EntityStore.class);
+ TableEntity tableEntity = mock(TableEntity.class);
+ when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+ when(tableEntity.id()).thenReturn(100L);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore",
entityStore, true);
+
+ setupStorage();
+ }
+
+ @AfterAll
+ public void baseTeardown() throws Exception {
+ teardownStorage();
+ }
+
+ @Test
+ public void testFullCRUDLifecycle() throws IOException {
+ LOG.info("Testing full CRUD lifecycle");
+
+ // 1. CREATE - Insert statistics for a partition
+ List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ stats.put("custom-rowCount", StatisticValues.longValue(1000L));
+ stats.put("custom-sizeBytes", StatisticValues.longValue(5000000L));
+ stats.put("custom-lastModified",
StatisticValues.stringValue("2025-01-21"));
+
+ updates.add(PartitionStatisticsModification.update("partition_2025_01",
stats));
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE,
updates));
+
+ storage.updateStatistics(METALAKE, objectUpdates);
+ LOG.info("Created statistics for partition_2025_01");
+
+ // 2. READ - Verify statistics exist
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ assertEquals(1, result.size());
+ assertEquals("partition_2025_01", result.get(0).partitionName());
+ assertEquals(3, result.get(0).statistics().size());
+
+ // Verify values and audit info
+ PersistedStatistic rowCount =
+ result.get(0).statistics().stream()
+ .filter(s -> s.name().equals("custom-rowCount"))
+ .findFirst()
+ .orElseThrow();
+
+ assertEquals(1000L, rowCount.value().value());
+ assertNotNull(rowCount.auditInfo());
+ assertNotNull(rowCount.auditInfo().creator());
+ assertNotNull(rowCount.auditInfo().createTime());
+ LOG.info("Verified statistics were created with audit info");
+
+ // 3. UPDATE - Modify existing statistic
+ stats.clear();
+ stats.put("custom-rowCount", StatisticValues.longValue(2000L));
+ stats.put("custom-sizeBytes", StatisticValues.longValue(5000000L));
+ stats.put("custom-lastModified",
StatisticValues.stringValue("2025-01-21"));
+
+ updates.clear();
+ updates.add(PartitionStatisticsModification.update("partition_2025_01",
stats));
+ objectUpdates =
Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, updates));
+
+ storage.updateStatistics(METALAKE, objectUpdates);
+
+ result = storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+ rowCount =
+ result.get(0).statistics().stream()
+ .filter(s -> s.name().equals("custom-rowCount"))
+ .findFirst()
+ .orElseThrow();
+
+ assertEquals(2000L, rowCount.value().value());
+ LOG.info("Updated statistic value from 1000 to 2000");
+
+ // 4. DELETE - Drop specific statistic
+ List<MetadataObjectStatisticsDrop> drops =
+ Lists.newArrayList(
+ MetadataObjectStatisticsDrop.of(
+ TEST_TABLE,
+ Lists.newArrayList(
+ PartitionStatisticsModification.drop(
+ "partition_2025_01",
Lists.newArrayList("custom-rowCount")))));
+
+ int dropped = storage.dropStatistics(METALAKE, drops);
+ assertEquals(1, dropped);
+ LOG.info("Dropped 1 statistic");
+
+ // 5. VERIFY - Statistic is gone
+ result = storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+ assertEquals(2, result.get(0).statistics().size());
+
+ boolean hasRowCount =
+ result.get(0).statistics().stream().anyMatch(s ->
s.name().equals("custom-rowCount"));
+ assertFalse(hasRowCount);
+ LOG.info("Verified statistic was deleted");
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testPartitionRangeQueries() throws IOException {
+ LOG.info("Testing partition range queries");
+
+ // Insert statistics for multiple partitions
+ insertMultiplePartitions("p1", "p2", "p3", "p4", "p5");
+
+ // Test CLOSED bounds: [p2, p4] should return p2, p3, p4
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(
+ METALAKE,
+ TEST_TABLE,
+ PartitionRange.between(
+ "p2", PartitionRange.BoundType.CLOSED, "p4",
PartitionRange.BoundType.CLOSED));
+
+ assertEquals(3, result.size());
+ List<String> partitionNames =
+
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+ assertTrue(partitionNames.contains("p2"));
+ assertTrue(partitionNames.contains("p3"));
+ assertTrue(partitionNames.contains("p4"));
+ LOG.info("CLOSED bounds [p2, p4] returned 3 partitions: {}",
partitionNames);
+
+ // Test OPEN bounds: (p2, p4) should return only p3
+ result =
+ storage.listStatistics(
+ METALAKE,
+ TEST_TABLE,
+ PartitionRange.between(
+ "p2", PartitionRange.BoundType.OPEN, "p4",
PartitionRange.BoundType.OPEN));
+
+ assertEquals(1, result.size());
+ assertEquals("p3", result.get(0).partitionName());
+ LOG.info("OPEN bounds (p2, p4) returned 1 partition: p3");
+
+ // Test upTo: <= p3 should return p1, p2, p3
+ result =
+ storage.listStatistics(
+ METALAKE, TEST_TABLE, PartitionRange.upTo("p3",
PartitionRange.BoundType.CLOSED));
+
+ assertEquals(3, result.size());
+ partitionNames =
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+ assertTrue(partitionNames.contains("p1"));
+ assertTrue(partitionNames.contains("p2"));
+ assertTrue(partitionNames.contains("p3"));
+ LOG.info("upTo p3 (CLOSED) returned 3 partitions");
+
+ // Test downTo: >= p3 should return p3, p4, p5
+ result =
+ storage.listStatistics(
+ METALAKE, TEST_TABLE, PartitionRange.downTo("p3",
PartitionRange.BoundType.CLOSED));
+
+ assertEquals(3, result.size());
+ partitionNames =
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+ assertTrue(partitionNames.contains("p3"));
+ assertTrue(partitionNames.contains("p4"));
+ assertTrue(partitionNames.contains("p5"));
+ LOG.info("downTo p3 (CLOSED) returned 3 partitions");
+
+ // Test OPEN lower bound: (p2, p5] should return p3, p4, p5
+ result =
+ storage.listStatistics(
+ METALAKE,
+ TEST_TABLE,
+ PartitionRange.between(
+ "p2", PartitionRange.BoundType.OPEN, "p5",
PartitionRange.BoundType.CLOSED));
+
+ assertEquals(3, result.size());
+ LOG.info("OPEN-CLOSED bounds (p2, p5] returned 3 partitions");
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testListStatisticsByPartitionNames() throws IOException {
+ LOG.info("Testing list statistics by partition names");
+
+ // Insert statistics for multiple partitions
+ insertMultiplePartitions("part_a", "part_b", "part_c", "part_d",
"part_e");
+
+ // List specific partitions
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(
+ METALAKE, TEST_TABLE, Lists.newArrayList("part_b", "part_d",
"part_e"));
+
+ assertEquals(3, result.size());
+ List<String> partitionNames =
+
result.stream().map(PersistedPartitionStatistics::partitionName).toList();
+ assertTrue(partitionNames.contains("part_b"));
+ assertTrue(partitionNames.contains("part_d"));
+ assertTrue(partitionNames.contains("part_e"));
+ assertFalse(partitionNames.contains("part_a"));
+ assertFalse(partitionNames.contains("part_c"));
+ LOG.info("Listed 3 specific partitions by name");
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testConcurrentWrites() throws Exception {
+ LOG.info("Testing concurrent writes from multiple threads");
+
+ int threadCount = 10;
+ int partitionsPerThread = 50;
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch latch = new CountDownLatch(threadCount);
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+
+ for (int i = 0; i < threadCount; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ for (int j = 0; j < partitionsPerThread; j++) {
+ String partitionName = "thread" + threadId + "_p" + j;
+
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ stats.put("custom-threadId",
StatisticValues.longValue((long) threadId));
+ stats.put("custom-partitionIndex",
StatisticValues.longValue((long) j));
+ stats.put("custom-value",
StatisticValues.stringValue("thread" + threadId));
+
+ List<PartitionStatisticsUpdate> updates =
+ Lists.newArrayList(
+
PartitionStatisticsModification.update(partitionName, stats));
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+
Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE, updates));
+
+ storage.updateStatistics(METALAKE, objectUpdates);
+ }
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error("Thread {} failed", threadId, e);
+ errorCount.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ assertTrue(completed, "All threads should complete within 60 seconds");
+ assertEquals(threadCount, successCount.get(), "All threads should
succeed");
+ assertEquals(0, errorCount.get(), "No threads should have errors");
+
+ // Verify all statistics were written
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ assertEquals(
+ threadCount * partitionsPerThread,
+ result.size(),
+ "Should have statistics for all partitions");
+
+ LOG.info(
+ "Successfully completed {} concurrent writes across {} threads",
+ result.size(),
+ threadCount);
+
+ executor.shutdown();
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testMultipleStatisticTypes() throws IOException {
+ LOG.info("Testing multiple statistic value types");
+
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ stats.put("custom-stringValue", StatisticValues.stringValue("test
string"));
+ stats.put("custom-longValue", StatisticValues.longValue(12345L));
+ stats.put("custom-doubleValue", StatisticValues.doubleValue(123.45));
+ stats.put("custom-booleanValue", StatisticValues.booleanValue(true));
+
+ List<PartitionStatisticsUpdate> updates =
+ Lists.newArrayList(
+ PartitionStatisticsModification.update("mixed_types_partition",
stats));
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE,
updates));
+
+ storage.updateStatistics(METALAKE, objectUpdates);
+
+ // Read back and verify types
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ assertEquals(1, result.size());
+ List<PersistedStatistic> statistics = result.get(0).statistics();
+ assertEquals(4, statistics.size());
+
+ // Verify each type
+ PersistedStatistic stringVal =
+ statistics.stream()
+ .filter(s -> s.name().equals("custom-stringValue"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals("test string", stringVal.value().value());
+
+ PersistedStatistic longVal =
+ statistics.stream()
+ .filter(s -> s.name().equals("custom-longValue"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(12345L, longVal.value().value());
+
+ PersistedStatistic doubleVal =
+ statistics.stream()
+ .filter(s -> s.name().equals("custom-doubleValue"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(123.45, doubleVal.value().value());
+
+ PersistedStatistic boolVal =
+ statistics.stream()
+ .filter(s -> s.name().equals("custom-booleanValue"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(true, boolVal.value().value());
+
+ LOG.info("All statistic value types serialized and deserialized
correctly");
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testBatchDropMultipleStatistics() throws IOException {
+ LOG.info("Testing batch drop of multiple statistics");
+
+ // Insert statistics for multiple partitions
+ insertMultiplePartitions("drop_p1", "drop_p2", "drop_p3");
+
+ // Drop multiple statistics from different partitions
+ List<PartitionStatisticsDrop> dropsList = new ArrayList<>();
+ dropsList.add(
+ PartitionStatisticsModification.drop("drop_p1",
Lists.newArrayList("custom-stat1")));
+ dropsList.add(
+ PartitionStatisticsModification.drop(
+ "drop_p2", Lists.newArrayList("custom-stat1", "custom-stat2")));
+
+ List<MetadataObjectStatisticsDrop> drops =
+ Lists.newArrayList(MetadataObjectStatisticsDrop.of(TEST_TABLE,
dropsList));
+
+ int droppedCount = storage.dropStatistics(METALAKE, drops);
+ assertEquals(3, droppedCount, "Should drop 3 statistics total (1 from
p1, 2 from p2)");
+
+ LOG.info("Dropped {} statistics in batch", droppedCount);
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ @Test
+ public void testLargeDataset() throws IOException {
+ LOG.info("Testing large dataset handling");
+
+ int partitionCount = 1000;
+ int statisticsPerPartition = 10;
+
+ // Insert large dataset
+ List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+ for (int i = 0; i < partitionCount; i++) {
+ String partitionName = "large_p" + String.format("%04d", i);
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+
+ for (int j = 0; j < statisticsPerPartition; j++) {
+ stats.put("custom-stat" + j, StatisticValues.longValue((long) (i *
1000 + j)));
+ }
+
+ updates.add(PartitionStatisticsModification.update(partitionName,
stats));
+ }
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE,
updates));
+
+ long startTime = System.currentTimeMillis();
+ storage.updateStatistics(METALAKE, objectUpdates);
+ long insertTime = System.currentTimeMillis() - startTime;
+
+ LOG.info(
+ "Inserted {} partitions with {} statistics each in {} ms",
+ partitionCount,
+ statisticsPerPartition,
+ insertTime);
+
+ // Query all statistics
+ startTime = System.currentTimeMillis();
+ List<PersistedPartitionStatistics> result =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+ long queryTime = System.currentTimeMillis() - startTime;
+
+ assertEquals(partitionCount, result.size());
+ assertEquals(statisticsPerPartition, result.get(0).statistics().size());
+
+ LOG.info("Queried {} partitions in {} ms", partitionCount, queryTime);
+
+ // Performance assertions
+ assertTrue(
+ insertTime < 30000,
+ "Insert should complete within 30 seconds, took " + insertTime +
"ms");
+ assertTrue(
+ queryTime < 5000, "Query should complete within 5 seconds, took " +
queryTime + "ms");
+
+ // Cleanup
+ cleanupAllStatistics();
+ }
+
+ // ==================== Helper Methods ====================
+
+ /**
+ * Inserts test statistics for multiple partitions.
+ *
+ * @param partitionNames the partition names to create
+ */
+ protected void insertMultiplePartitions(String... partitionNames) throws
IOException {
+ List<PartitionStatisticsUpdate> updates = new ArrayList<>();
+
+ for (String partitionName : partitionNames) {
+ Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+ stats.put("custom-stat1", StatisticValues.longValue(100L));
+ stats.put("custom-stat2", StatisticValues.stringValue("value2"));
+ stats.put("custom-stat3", StatisticValues.doubleValue(3.14));
+
+ updates.add(PartitionStatisticsModification.update(partitionName,
stats));
+ }
+
+ List<MetadataObjectStatisticsUpdate> objectUpdates =
+ Lists.newArrayList(MetadataObjectStatisticsUpdate.of(TEST_TABLE,
updates));
+
+ storage.updateStatistics(METALAKE, objectUpdates);
+ }
+
+ /** Removes all statistics for the test table to cleanup between tests. */
+ protected void cleanupAllStatistics() throws IOException {
+ // Get all current statistics
+ List<PersistedPartitionStatistics> allStats =
+ storage.listStatistics(METALAKE, TEST_TABLE,
PartitionRange.ALL_PARTITIONS);
+
+ if (allStats.isEmpty()) {
+ return;
+ }
+
+ // Build drop list for all statistics
+ List<PartitionStatisticsDrop> dropsList = new ArrayList<>();
+ for (PersistedPartitionStatistics partitionStats : allStats) {
+ List<String> statisticNames =
+
partitionStats.statistics().stream().map(PersistedStatistic::name).toList();
+
+ dropsList.add(
+
PartitionStatisticsModification.drop(partitionStats.partitionName(),
statisticNames));
+ }
+
+ List<MetadataObjectStatisticsDrop> drops =
+ Lists.newArrayList(MetadataObjectStatisticsDrop.of(TEST_TABLE,
dropsList));
+
+ storage.dropStatistics(METALAKE, drops);
+ LOG.debug("Cleaned up all statistics for test table");
+ }
+ }
+
+ /** MySQL-specific tests using Docker container. */
+ @Nested
+ @Tag("gravitino-docker-test")
+ static class MySQLTest extends BaseJdbcPartitionStatisticStorageTest {
+
+ private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ private static final TestDatabaseName TEST_DB_NAME =
TestDatabaseName.MYSQL_MYSQL_ABSTRACT_IT;
+ private MySQLContainer mySQLContainer;
+
+ @Override
+ protected void setupStorage() throws Exception {
+ LOG.info("Starting MySQL container for partition statistics integration
tests");
+
+ // Start MySQL container
+ containerSuite.startMySQLContainer(TEST_DB_NAME);
+ mySQLContainer = containerSuite.getMySQLContainer();
+
+ // Create database schema
+ createMySQLSchema();
+
+ // Create storage factory with MySQL container connection
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("jdbcUrl", mySQLContainer.getJdbcUrl(TEST_DB_NAME));
+ properties.put("jdbcUser", mySQLContainer.getUsername());
+ properties.put("jdbcPassword", mySQLContainer.getPassword());
+ properties.put("jdbcDriver",
mySQLContainer.getDriverClassName(TEST_DB_NAME));
+
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+ storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+ LOG.info("MySQL partition statistics storage initialized successfully");
+ }
+
+ @Override
+ protected void teardownStorage() throws Exception {
+ if (storage != null) {
+ storage.close();
+ LOG.info("MySQL partition statistics storage closed");
+ }
+ }
+
+ /** Creates the partition_statistic_meta table in the MySQL test database.
*/
+ private void createMySQLSchema() throws SQLException {
+ String jdbcUrl = mySQLContainer.getJdbcUrl(TEST_DB_NAME);
+ String username = mySQLContainer.getUsername();
+ String password = mySQLContainer.getPassword();
+
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username,
password);
+ Statement stmt = conn.createStatement()) {
+
+ String createTableSQL =
+ "CREATE TABLE IF NOT EXISTS `partition_statistic_meta` ("
+ + " `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id
from table_meta',"
+ + " `partition_name` VARCHAR(1024) NOT NULL COMMENT
'partition name',"
+ + " `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic
name',"
+ + " `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic
value as JSON',"
+ + " `audit_info` TEXT NOT NULL COMMENT 'audit information as
JSON',"
+ + " `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT
'creation timestamp in milliseconds',"
+ + " `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last
update timestamp in milliseconds',"
+ + " PRIMARY KEY (`table_id`, `partition_name`(255),
`statistic_name`),"
+ + " KEY `idx_table_partition` (`table_id`,
`partition_name`(255))"
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
+ + " COMMENT 'partition statistics metadata'";
+
+ stmt.execute(createTableSQL);
+ LOG.info("Created partition_statistic_meta table in MySQL");
+ }
+ }
+ }
+
+ /** PostgreSQL-specific tests using Docker container. */
+ @Nested
+ @Tag("gravitino-docker-test")
+ static class PostgreSQLTest extends BaseJdbcPartitionStatisticStorageTest {
+
+ private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ private static final TestDatabaseName TEST_DB_NAME =
TestDatabaseName.PG_TEST_PARTITION_STATS;
+ private PostgreSQLContainer postgreSQLContainer;
+
+ @Override
+ protected void setupStorage() throws Exception {
+ LOG.info("Starting PostgreSQL container for partition statistics
integration tests");
+
+ // Start PostgreSQL container
+ containerSuite.startPostgreSQLContainer(TEST_DB_NAME,
PGImageName.VERSION_13);
+ postgreSQLContainer =
containerSuite.getPostgreSQLContainer(PGImageName.VERSION_13);
+
+ // Create database schema
+ createPostgreSQLSchema();
+
+ // Create storage factory with PostgreSQL container connection
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("jdbcUrl", postgreSQLContainer.getJdbcUrl(TEST_DB_NAME));
+ properties.put("jdbcUser", postgreSQLContainer.getUsername());
+ properties.put("jdbcPassword", postgreSQLContainer.getPassword());
+ properties.put("jdbcDriver",
postgreSQLContainer.getDriverClassName(TEST_DB_NAME));
+
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+ storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+ LOG.info("PostgreSQL partition statistics storage initialized
successfully");
+ }
+
+ @Override
+ protected void teardownStorage() throws Exception {
+ if (storage != null) {
+ storage.close();
+ LOG.info("PostgreSQL partition statistics storage closed");
+ }
+ }
+
+ /** Creates the partition_statistic_meta table in the PostgreSQL test
database. */
+ private void createPostgreSQLSchema() throws SQLException {
+ String jdbcUrl = postgreSQLContainer.getJdbcUrl(TEST_DB_NAME);
+ String username = postgreSQLContainer.getUsername();
+ String password = postgreSQLContainer.getPassword();
+
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username,
password);
+ Statement stmt = conn.createStatement()) {
+
+ String createTableSQL =
+ "CREATE TABLE IF NOT EXISTS partition_statistic_meta ("
+ + " table_id BIGINT NOT NULL,"
+ + " partition_name VARCHAR(1024) NOT NULL,"
+ + " statistic_name VARCHAR(128) NOT NULL,"
+ + " statistic_value TEXT NOT NULL,"
+ + " audit_info TEXT NOT NULL,"
+ + " created_at BIGINT NOT NULL,"
+ + " updated_at BIGINT NOT NULL,"
+ + " PRIMARY KEY (table_id, partition_name, statistic_name)"
+ + ")";
+
+ stmt.execute(createTableSQL);
+
+ String createIndexSQL =
+ "CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name)";
+ stmt.execute(createIndexSQL);
+
+ LOG.info("Created partition_statistic_meta table in PostgreSQL");
+ }
+ }
+ }
+
+ /** H2-specific tests using embedded in-memory database. */
+ @Nested
+ static class H2Test extends BaseJdbcPartitionStatisticStorageTest {
+
+ private static final String H2_JDBC_URL =
+ "jdbc:h2:mem:test_partition_stats;MODE=MySQL;DB_CLOSE_DELAY=-1";
+ private static final String H2_USERNAME = "sa";
+ private static final String H2_PASSWORD = "";
+
+ @Override
+ protected void setupStorage() throws Exception {
+ LOG.info("Setting up H2 in-memory database for partition statistics
integration tests");
+
+ // Create database schema
+ createH2Schema();
+
+ // Create storage factory with H2 connection
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("jdbcUrl", H2_JDBC_URL);
+ properties.put("jdbcUser", H2_USERNAME);
+ properties.put("jdbcPassword", H2_PASSWORD);
+ properties.put("jdbcDriver", "org.h2.Driver");
+
+ JdbcPartitionStatisticStorageFactory factory = new
JdbcPartitionStatisticStorageFactory();
+ storage = (JdbcPartitionStatisticStorage) factory.create(properties);
+
+ LOG.info("H2 partition statistics storage initialized successfully");
+ }
+
+ @Override
+ protected void teardownStorage() throws Exception {
+ if (storage != null) {
+ storage.close();
+ LOG.info("H2 partition statistics storage closed");
+ }
+
+ // Drop the H2 database
+ try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
H2_USERNAME, H2_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DROP TABLE IF EXISTS partition_statistic_meta");
+ LOG.info("Dropped H2 database");
+ }
+ }
+
+ /** Creates the partition_statistic_meta table in the H2 test database. */
+ private void createH2Schema() throws SQLException {
+ try (Connection conn = DriverManager.getConnection(H2_JDBC_URL,
H2_USERNAME, H2_PASSWORD);
+ Statement stmt = conn.createStatement()) {
+
+ String createTableSQL =
+ "CREATE TABLE IF NOT EXISTS partition_statistic_meta ("
+ + " table_id BIGINT NOT NULL,"
+ + " partition_name VARCHAR(1024) NOT NULL,"
+ + " statistic_name VARCHAR(128) NOT NULL,"
+ + " statistic_value CLOB NOT NULL,"
+ + " audit_info CLOB NOT NULL,"
+ + " created_at BIGINT NOT NULL,"
+ + " updated_at BIGINT NOT NULL,"
+ + " PRIMARY KEY (table_id, partition_name, statistic_name)"
+ + ")";
+
+ stmt.execute(createTableSQL);
+
+ String createIndexSQL =
+ "CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name)";
+ stmt.execute(createIndexSQL);
+
+ LOG.info("Created partition_statistic_meta table in H2");
+ }
+ }
+ }
+}
diff --git a/docs/manage-statistics-in-gravitino.md
b/docs/manage-statistics-in-gravitino.md
index 139790fd39..bb064fdd19 100644
--- a/docs/manage-statistics-in-gravitino.md
+++ b/docs/manage-statistics-in-gravitino.md
@@ -234,11 +234,73 @@ table.dropPartitionStatistics(statisticsToDrop);
### Server configuration
-| Configuration item | Description
| Default value
| Required | Since version |
-|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------|-----------|---------------|
-| `gravitino.stats.partition.storageFactoryClass` | The storage factory class
for partition statistics, which is used to store partition statistics in the
different storage. The
`org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory` can
only be used for testing. |
`org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory` |
No | 1.0.0 |
+| Configuration item | Description
| Default value
| Required | Since version |
+|-------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|-----------|---------------|
+| `gravitino.stats.partition.storageFactoryClass` | The storage factory class
for partition statistics, which is used to store partition statistics in the
different storage. The
`org.apache.gravitino.stats.storage.MemoryPartitionStatsStorageFactory` can
only be used for testing. |
`org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory` | No
| 1.0.0 |
+#### JDBC Storage (Default)
+
+Starting from version 1.2.0, Gravitino uses JDBC-based storage as the default
partition statistics storage backend.
+This provides a reliable, production-ready solution that supports multiple
database backends:
+
+- **MySQL** (recommended for production)
+- **PostgreSQL**
+- **H2** (suitable for testing and development)
+
+To use JDBC storage, configure the following options by adding the prefix
`gravitino.stats.partition.storageOption.`:
+
+| Configuration item |
Description | Default
value | Required | Since version |
+|-----------------------------------------------------------------|--------------------------------------------------------------------|----------------------------|----------|---------------|
+| `gravitino.stats.partition.storageOption.jdbcUrl` | JDBC
connection URL (e.g., jdbc:mysql://localhost:3306/gravitino) | None
| Yes | 1.2.0 |
+| `gravitino.stats.partition.storageOption.jdbcUser` | Database
username | None
| Yes | 1.2.0 |
+| `gravitino.stats.partition.storageOption.jdbcPassword` | Database
password | None
| Yes | 1.2.0 |
+| `gravitino.stats.partition.storageOption.jdbcDriver` | JDBC
driver class name |
`com.mysql.cj.jdbc.Driver` | No | 1.2.0 |
+| `gravitino.stats.partition.storageOption.poolMaxSize` | Maximum
connection pool size | `10`
| No | 1.2.0 |
+| `gravitino.stats.partition.storageOption.poolMinIdle` | Minimum
idle connections in pool | `2`
| No | 1.2.0 |
+| `gravitino.stats.partition.storageOption.connectionTimeoutMs` | Connection
timeout in milliseconds | `30000`
| No | 1.2.0 |
+| `gravitino.stats.partition.storageOption.testOnBorrow` | Test
connections before use | `true`
| No | 1.2.0 |
+
+**Example MySQL Configuration:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass =
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.jdbcUrl =
jdbc:mysql://localhost:3306/gravitino
+gravitino.stats.partition.storageOption.jdbcUser = gravitino
+gravitino.stats.partition.storageOption.jdbcPassword = gravitino123
+gravitino.stats.partition.storageOption.poolMaxSize = 20
+```
+
+**Example PostgreSQL Configuration:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass =
org.apache.gravitino.stats.storage.JdbcPartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.jdbcUrl =
jdbc:postgresql://localhost:5432/gravitino
+gravitino.stats.partition.storageOption.jdbcUser = gravitino
+gravitino.stats.partition.storageOption.jdbcPassword = gravitino123
+gravitino.stats.partition.storageOption.jdbcDriver = org.postgresql.Driver
+```
+
+**Database Schema Setup:**
+
+Before using JDBC storage, you need to create the database schema. Schema
files are provided for all supported databases:
+
+- MySQL: `scripts/mysql/schema-${GRAVITINO_VERSION}-mysql.sql`
+- PostgreSQL: `scripts/postgresql/schema-${GRAVITINO_VERSION}-postgresql.sql`
+- H2: `scripts/h2/schema-${GRAVITINO_VERSION}-h2.sql`
+
+For MySQL:
+```bash
+mysql -u root -p < scripts/mysql/schema-${GRAVITINO_VERSION}-mysql.sql
+```
+
+For PostgreSQL:
+```bash
+psql -U postgres -d gravitino -f
scripts/postgresql/schema-${GRAVITINO_VERSION}-postgresql.sql
+```
+
+#### Lance Storage (Alternative)
+
If you use [Lance](https://lancedb.github.io/lance/) as the partition
statistics storage, you can set the options below, if you have other lance
storage options, you can pass it by adding prefix
`gravitino.stats.partition.storageOption.`.
For example, if you set an extra property `foo` to `bar` for Lance storage
option, you can add a configuration item
`gravitino.stats.partition.storageOption.foo` with value `bar`.
@@ -258,7 +320,14 @@ For Lance remote storage, you can refer to the document
[here](https://lancedb.g
If you have many tables with a small number of partitions, you should set a
smaller metadataFileCacheSizeBytes and indexCacheSizeBytes.
-### Implementation a custom partition storage
+**To use Lance storage, configure:**
+
+```properties
+gravitino.stats.partition.storageFactoryClass =
org.apache.gravitino.stats.storage.LancePartitionStatisticStorageFactory
+gravitino.stats.partition.storageOption.location = /data/lance
+```
+
+### Implement a custom partition storage
You can implement a custom partition storage by implementing the interface
`org.apache.gravitino.stats.storage.PartitionStatisticStorageFactory` and
setting the configuration item `gravitino.stats.partition.storageFactoryClass`
to your class name.
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
index 806696c6a2..8f0de42219 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
@@ -115,4 +115,13 @@ public enum TestDatabaseName {
}
},
PG_ICEBERG_AUTHZ_IT,
+
+ /** Represents the PostgreSQL database for partition statistics integration
tests. */
+ PG_TEST_PARTITION_STATS {
+ /** PostgreSQL only accept lowercase database name */
+ @Override
+ public String toString() {
+ return this.name().toLowerCase();
+ }
+ },
}
diff --git a/scripts/h2/schema-1.2.0-h2.sql b/scripts/h2/schema-1.2.0-h2.sql
index 92fa597fe3..4065ca170d 100644
--- a/scripts/h2/schema-1.2.0-h2.sql
+++ b/scripts/h2/schema-1.2.0-h2.sql
@@ -496,3 +496,18 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
KEY `idx_vecid` (`catalog_id`)
) ENGINE=InnoDB;
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for
tables
+
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+ table_id BIGINT NOT NULL COMMENT 'table id from table_meta',
+ partition_name VARCHAR(1024) NOT NULL COMMENT 'partition name',
+ statistic_name VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ statistic_value CLOB NOT NULL COMMENT 'statistic value as JSON',
+ audit_info CLOB NOT NULL COMMENT 'audit information as JSON',
+ created_at BIGINT NOT NULL COMMENT 'creation timestamp in milliseconds',
+ updated_at BIGINT NOT NULL COMMENT 'last update timestamp in milliseconds',
+ PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name);
diff --git a/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
b/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
index e6614b596d..b011cf17e0 100644
--- a/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
+++ b/scripts/h2/upgrade-1.1.0-to-1.2.0-h2.sql
@@ -69,3 +69,16 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
KEY `idx_vecid` (`catalog_id`)
) ENGINE=InnoDB;
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+ table_id BIGINT NOT NULL COMMENT 'table id from table_meta',
+ partition_name VARCHAR(1024) NOT NULL COMMENT 'partition name',
+ statistic_name VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ statistic_value CLOB NOT NULL COMMENT 'statistic value as JSON',
+ audit_info CLOB NOT NULL COMMENT 'audit information as JSON',
+ created_at BIGINT NOT NULL COMMENT 'creation timestamp in milliseconds',
+ updated_at BIGINT NOT NULL COMMENT 'last update timestamp in milliseconds',
+ PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name);
diff --git a/scripts/mysql/schema-1.2.0-mysql.sql
b/scripts/mysql/schema-1.2.0-mysql.sql
index ddb4372a27..d795558a1f 100644
--- a/scripts/mysql/schema-1.2.0-mysql.sql
+++ b/scripts/mysql/schema-1.2.0-mysql.sql
@@ -487,3 +487,17 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
KEY `idx_vecid` (`catalog_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'view
metadata';
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for
tables
+
+CREATE TABLE IF NOT EXISTS `partition_statistic_meta` (
+ `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id from table_meta',
+ `partition_name` VARCHAR(1024) NOT NULL COMMENT 'partition name',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value as JSON',
+ `audit_info` TEXT NOT NULL COMMENT 'audit information as JSON',
+ `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'creation timestamp in
milliseconds',
+ `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last update timestamp
in milliseconds',
+ PRIMARY KEY (`table_id`, `partition_name`(255), `statistic_name`),
+ KEY `idx_table_partition` (`table_id`, `partition_name`(255))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'partition
statistics metadata';
diff --git a/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
b/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
index 7140320410..120af6bfa2 100644
--- a/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
+++ b/scripts/mysql/upgrade-1.1.0-to-1.2.0-mysql.sql
@@ -69,3 +69,15 @@ CREATE TABLE IF NOT EXISTS `view_meta` (
KEY `idx_vecid` (`catalog_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'view
metadata';
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS `partition_statistic_meta` (
+ `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id from table_meta',
+ `partition_name` VARCHAR(1024) NOT NULL COMMENT 'partition name',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value as JSON',
+ `audit_info` TEXT NOT NULL COMMENT 'audit information as JSON',
+ `created_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'creation timestamp in
milliseconds',
+ `updated_at` BIGINT(20) UNSIGNED NOT NULL COMMENT 'last update timestamp
in milliseconds',
+ PRIMARY KEY (`table_id`, `partition_name`(255), `statistic_name`),
+ KEY `idx_table_partition` (`table_id`, `partition_name`(255))
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'partition
statistics metadata';
diff --git a/scripts/postgresql/schema-1.2.0-postgresql.sql
b/scripts/postgresql/schema-1.2.0-postgresql.sql
index 3b4c188986..268b1f1c8c 100644
--- a/scripts/postgresql/schema-1.2.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.2.0-postgresql.sql
@@ -862,3 +862,27 @@ COMMENT ON COLUMN view_meta.current_version IS 'view
current version';
COMMENT ON COLUMN view_meta.last_version IS 'view last version';
COMMENT ON COLUMN view_meta.deleted_at IS 'view deleted at';
+-- This schema extends version 1.1.0 with partition statistics storage support
+-- The partition_statistic_meta table stores partition-level statistics for
tables
+
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+ table_id BIGINT NOT NULL,
+ partition_name VARCHAR(1024) NOT NULL,
+ statistic_name VARCHAR(128) NOT NULL,
+ statistic_value TEXT NOT NULL,
+ audit_info TEXT NOT NULL,
+ created_at BIGINT NOT NULL,
+ updated_at BIGINT NOT NULL,
+ PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name);
+
+COMMENT ON TABLE partition_statistic_meta IS 'partition statistics metadata';
+COMMENT ON COLUMN partition_statistic_meta.table_id IS 'table id from
table_meta';
+COMMENT ON COLUMN partition_statistic_meta.partition_name IS 'partition name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_value IS 'statistic value
as JSON';
+COMMENT ON COLUMN partition_statistic_meta.audit_info IS 'audit information as
JSON';
+COMMENT ON COLUMN partition_statistic_meta.created_at IS 'creation timestamp
in milliseconds';
+COMMENT ON COLUMN partition_statistic_meta.updated_at IS 'last update
timestamp in milliseconds';
diff --git a/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
b/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
index b83c0c051e..b04c0dcaa7 100644
--- a/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-1.1.0-to-1.2.0-postgresql.sql
@@ -105,3 +105,25 @@ COMMENT ON COLUMN view_meta.current_version IS 'view
current version';
COMMENT ON COLUMN view_meta.last_version IS 'view last version';
COMMENT ON COLUMN view_meta.deleted_at IS 'view deleted at';
+-- Add partition statistics storage support
+CREATE TABLE IF NOT EXISTS partition_statistic_meta (
+ table_id BIGINT NOT NULL,
+ partition_name VARCHAR(1024) NOT NULL,
+ statistic_name VARCHAR(128) NOT NULL,
+ statistic_value TEXT NOT NULL,
+ audit_info TEXT NOT NULL,
+ created_at BIGINT NOT NULL,
+ updated_at BIGINT NOT NULL,
+ PRIMARY KEY (table_id, partition_name, statistic_name)
+);
+
+CREATE INDEX IF NOT EXISTS idx_table_partition ON
partition_statistic_meta(table_id, partition_name);
+
+COMMENT ON TABLE partition_statistic_meta IS 'partition statistics metadata';
+COMMENT ON COLUMN partition_statistic_meta.table_id IS 'table id from
table_meta';
+COMMENT ON COLUMN partition_statistic_meta.partition_name IS 'partition name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN partition_statistic_meta.statistic_value IS 'statistic value
as JSON';
+COMMENT ON COLUMN partition_statistic_meta.audit_info IS 'audit information as
JSON';
+COMMENT ON COLUMN partition_statistic_meta.created_at IS 'creation timestamp
in milliseconds';
+COMMENT ON COLUMN partition_statistic_meta.updated_at IS 'last update
timestamp in milliseconds';