yuqi1129 commented on code in PR #9779: URL: https://github.com/apache/gravitino/pull/9779#discussion_r2715682885
########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorageFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.Map; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory for creating {@link MysqlPartitionStatisticStorage} instances. + * + * <p>This factory creates a MySQL-based partition statistics storage using Apache Commons DBCP2 for + * connection pooling. It configures the connection pool with appropriate settings for partition + * statistics workloads. + * + * <p>Configuration properties: + * + * <ul> + * <li>jdbc-url (required): JDBC connection URL for MySQL + * <li>jdbc-user (required): Database username + * <li>jdbc-password (required): Database password + * <li>jdbc-driver (optional): JDBC driver class name (defaults to com.mysql.cj.jdbc.Driver) + * <li>pool-max-size (optional): Maximum connection pool size (default: 10) + * <li>pool-min-idle (optional): Minimum idle connections (default: 2) + * <li>connection-timeout-ms (optional): Connection timeout in milliseconds (default: 30000) + * <li>test-on-borrow (optional): Test connections before use (default: true) + * </ul> + */ +public class MysqlPartitionStatisticStorageFactory implements PartitionStatisticStorageFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(MysqlPartitionStatisticStorageFactory.class); + + // Configuration keys + private static final String JDBC_URL = "jdbc-url"; + private static final String JDBC_USER = "jdbc-user"; + private static final String JDBC_PASSWORD = "jdbc-password"; + private static final String JDBC_DRIVER = "jdbc-driver"; + private static final String POOL_MAX_SIZE = "pool-max-size"; + private static final String POOL_MIN_IDLE = "pool-min-idle"; + private static final String CONNECTION_TIMEOUT_MS = "connection-timeout-ms"; + private static final String TEST_ON_BORROW = "test-on-borrow"; + + // Default values + private static final String DEFAULT_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + private static final int DEFAULT_POOL_MAX_SIZE = 10; + private static final int DEFAULT_POOL_MIN_IDLE = 2; + private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30000L; + private static final boolean DEFAULT_TEST_ON_BORROW = true; + private static final String VALIDATION_QUERY = "SELECT 1"; + + // Keep reference to DataSource for cleanup + private BasicDataSource dataSource; + + @Override + public PartitionStatisticStorage create(Map<String, String> properties) { + LOG.info( + "Creating MysqlPartitionStatisticStorage with properties: {}", + maskSensitiveProperties(properties)); + + validateRequiredProperties(properties); + + try { + dataSource = createDataSource(properties); + return new MysqlPartitionStatisticStorage(dataSource); + } catch (Exception e) { + if (dataSource != null) { + try { + dataSource.close(); + } catch (SQLException closeException) { + LOG.error("Failed to close data source after creation error", closeException); + } + } + throw new GravitinoRuntimeException(e, "Failed to create MysqlPartitionStatisticStorage"); + } + } + + /** + * Creates and configures a BasicDataSource from the provided properties. + * + * @param properties configuration properties + * @return configured DataSource + */ + private BasicDataSource createDataSource(Map<String, String> properties) { + BasicDataSource ds = new BasicDataSource(); + + // Required properties + String jdbcUrl = properties.get(JDBC_URL); + String jdbcUser = properties.get(JDBC_USER); + String jdbcPassword = properties.get(JDBC_PASSWORD); + + ds.setUrl(jdbcUrl); + ds.setUsername(jdbcUser); + ds.setPassword(jdbcPassword); + + // Optional properties with defaults + String driverClassName = properties.getOrDefault(JDBC_DRIVER, DEFAULT_JDBC_DRIVER); + ds.setDriverClassName(driverClassName); + + int maxSize = + Integer.parseInt( + properties.getOrDefault(POOL_MAX_SIZE, String.valueOf(DEFAULT_POOL_MAX_SIZE))); + ds.setMaxTotal(maxSize); + + int minIdle = + Integer.parseInt( + properties.getOrDefault(POOL_MIN_IDLE, String.valueOf(DEFAULT_POOL_MIN_IDLE))); + ds.setMinIdle(minIdle); + + long timeoutMs = + Long.parseLong( + properties.getOrDefault( + CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_CONNECTION_TIMEOUT_MS))); + ds.setMaxWait(Duration.ofMillis(timeoutMs)); + + boolean testOnBorrow = + Boolean.parseBoolean( + properties.getOrDefault(TEST_ON_BORROW, String.valueOf(DEFAULT_TEST_ON_BORROW))); + ds.setTestOnBorrow(testOnBorrow); + + if (testOnBorrow) { + ds.setValidationQuery(VALIDATION_QUERY); + } + + LOG.info( + "Created MySQL DataSource: url={}, driver={}, maxPoolSize={}, minIdle={}, timeout={}ms", + jdbcUrl, + driverClassName, + maxSize, + minIdle, + timeoutMs); + + return ds; + } + + /** + * Validates that all required properties are present. + * + * @param properties configuration properties + * @throws IllegalArgumentException if required properties are missing + */ + private void validateRequiredProperties(Map<String, String> properties) { + if (!properties.containsKey(JDBC_URL)) { + throw new IllegalArgumentException("Missing required property: " + JDBC_URL); + } + if (!properties.containsKey(JDBC_USER)) { + throw new IllegalArgumentException("Missing required property: " + JDBC_USER); + } + if (!properties.containsKey(JDBC_PASSWORD)) { + throw new IllegalArgumentException("Missing required property: " + JDBC_PASSWORD); + } + } + + /** + * Creates a masked copy of properties for logging (hides password). + * + * @param properties original properties + * @return masked properties map + */ + private Map<String, String> maskSensitiveProperties(Map<String, String> properties) { + Map<String, String> masked = new java.util.HashMap<>(properties); Review Comment: Remove `java.util.` ########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorage.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.stats.PartitionRange; +import org.apache.gravitino.stats.PartitionStatisticsDrop; +import org.apache.gravitino.stats.PartitionStatisticsUpdate; +import org.apache.gravitino.stats.StatisticValue; +import org.apache.gravitino.utils.MetadataObjectUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-based implementation of {@link PartitionStatisticStorage}. + * + * <p>This implementation stores partition statistics in a MySQL database table, using Apache + * Commons DBCP2 for connection pooling. Statistics are stored as JSON-serialized values along with + * audit information. + */ +public class MysqlPartitionStatisticStorage implements PartitionStatisticStorage { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlPartitionStatisticStorage.class); + + private final DataSource dataSource; + private final EntityStore entityStore; + + // SQL statements + private static final String INSERT_OR_UPDATE_SQL = + "INSERT INTO partition_statistic_meta " + + "(table_id, partition_name, statistic_name, statistic_value, audit_info, created_at, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "statistic_value = VALUES(statistic_value), " + + "audit_info = VALUES(audit_info), " + + "updated_at = VALUES(updated_at)"; + + private static final String SELECT_STATISTICS_SQL = + "SELECT partition_name, statistic_name, statistic_value, audit_info " + + "FROM partition_statistic_meta " + + "WHERE table_id = ? "; + + private static final String DELETE_STATISTICS_SQL = + "DELETE FROM partition_statistic_meta " + + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?"; + + /** + * Constructs a new MysqlPartitionStatisticStorage. + * + * @param dataSource the JDBC DataSource for MySQL connections + */ + public MysqlPartitionStatisticStorage(DataSource dataSource) { + this.dataSource = dataSource; + this.entityStore = GravitinoEnv.getInstance().entityStore(); + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, PartitionRange partitionRange) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, range: {}", + metalake, + metadataObject.fullName(), + partitionRange); + + Long tableId = resolveTableId(metalake, metadataObject); + String rangeFilter = buildPartitionRangeFilter(partitionRange); + + String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); + + // Set partition range parameters + setPartitionRangeParameters(stmt, partitionRange, paramIndex); Review Comment: Why not use `2` directly for `paramIndex`? ########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorageFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.Map; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory for creating {@link MysqlPartitionStatisticStorage} instances. + * + * <p>This factory creates a MySQL-based partition statistics storage using Apache Commons DBCP2 for + * connection pooling. It configures the connection pool with appropriate settings for partition + * statistics workloads. + * + * <p>Configuration properties: + * + * <ul> + * <li>jdbc-url (required): JDBC connection URL for MySQL + * <li>jdbc-user (required): Database username + * <li>jdbc-password (required): Database password + * <li>jdbc-driver (optional): JDBC driver class name (defaults to com.mysql.cj.jdbc.Driver) + * <li>pool-max-size (optional): Maximum connection pool size (default: 10) + * <li>pool-min-idle (optional): Minimum idle connections (default: 2) + * <li>connection-timeout-ms (optional): Connection timeout in milliseconds (default: 30000) + * <li>test-on-borrow (optional): Test connections before use (default: true) + * </ul> + */ +public class MysqlPartitionStatisticStorageFactory implements PartitionStatisticStorageFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(MysqlPartitionStatisticStorageFactory.class); + + // Configuration keys + private static final String JDBC_URL = "jdbc-url"; + private static final String JDBC_USER = "jdbc-user"; + private static final String JDBC_PASSWORD = "jdbc-password"; + private static final String JDBC_DRIVER = "jdbc-driver"; + private static final String POOL_MAX_SIZE = "pool-max-size"; + private static final String POOL_MIN_IDLE = "pool-min-idle"; + private static final String CONNECTION_TIMEOUT_MS = "connection-timeout-ms"; + private static final String TEST_ON_BORROW = "test-on-borrow"; + + // Default values + private static final String DEFAULT_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + private static final int DEFAULT_POOL_MAX_SIZE = 10; + private static final int DEFAULT_POOL_MIN_IDLE = 2; + private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30000L; + private static final boolean DEFAULT_TEST_ON_BORROW = true; + private static final String VALIDATION_QUERY = "SELECT 1"; + + // Keep reference to DataSource for cleanup + private BasicDataSource dataSource; + + @Override + public PartitionStatisticStorage create(Map<String, String> properties) { + LOG.info( + "Creating MysqlPartitionStatisticStorage with properties: {}", + maskSensitiveProperties(properties)); + + validateRequiredProperties(properties); + + try { + dataSource = createDataSource(properties); + return new MysqlPartitionStatisticStorage(dataSource); + } catch (Exception e) { + if (dataSource != null) { + try { + dataSource.close(); + } catch (SQLException closeException) { + LOG.error("Failed to close data source after creation error", closeException); + } + } + throw new GravitinoRuntimeException(e, "Failed to create MysqlPartitionStatisticStorage"); + } + } + + /** + * Creates and configures a BasicDataSource from the provided properties. + * + * @param properties configuration properties + * @return configured DataSource + */ + private BasicDataSource createDataSource(Map<String, String> properties) { + BasicDataSource ds = new BasicDataSource(); + + // Required properties + String jdbcUrl = properties.get(JDBC_URL); + String jdbcUser = properties.get(JDBC_USER); + String jdbcPassword = properties.get(JDBC_PASSWORD); + + ds.setUrl(jdbcUrl); + ds.setUsername(jdbcUser); + ds.setPassword(jdbcPassword); + + // Optional properties with defaults + String driverClassName = properties.getOrDefault(JDBC_DRIVER, DEFAULT_JDBC_DRIVER); + ds.setDriverClassName(driverClassName); + + int maxSize = + Integer.parseInt( + properties.getOrDefault(POOL_MAX_SIZE, String.valueOf(DEFAULT_POOL_MAX_SIZE))); + ds.setMaxTotal(maxSize); + + int minIdle = + Integer.parseInt( + properties.getOrDefault(POOL_MIN_IDLE, String.valueOf(DEFAULT_POOL_MIN_IDLE))); + ds.setMinIdle(minIdle); + + long timeoutMs = + Long.parseLong( + properties.getOrDefault( + CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_CONNECTION_TIMEOUT_MS))); + ds.setMaxWait(Duration.ofMillis(timeoutMs)); + + boolean testOnBorrow = + Boolean.parseBoolean( + properties.getOrDefault(TEST_ON_BORROW, String.valueOf(DEFAULT_TEST_ON_BORROW))); + ds.setTestOnBorrow(testOnBorrow); + + if (testOnBorrow) { + ds.setValidationQuery(VALIDATION_QUERY); + } + + LOG.info( + "Created MySQL DataSource: url={}, driver={}, maxPoolSize={}, minIdle={}, timeout={}ms", + jdbcUrl, + driverClassName, + maxSize, + minIdle, + timeoutMs); + + return ds; + } + + /** + * Validates that all required properties are present. + * + * @param properties configuration properties + * @throws IllegalArgumentException if required properties are missing + */ + private void validateRequiredProperties(Map<String, String> properties) { + if (!properties.containsKey(JDBC_URL)) { + throw new IllegalArgumentException("Missing required property: " + JDBC_URL); Review Comment: You'd better check that the value of `JDBC_URL` is a valid one, at least it should be a non-empty one. ########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorage.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.stats.PartitionRange; +import org.apache.gravitino.stats.PartitionStatisticsDrop; +import org.apache.gravitino.stats.PartitionStatisticsUpdate; +import org.apache.gravitino.stats.StatisticValue; +import org.apache.gravitino.utils.MetadataObjectUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-based implementation of {@link PartitionStatisticStorage}. + * + * <p>This implementation stores partition statistics in a MySQL database table, using Apache + * Commons DBCP2 for connection pooling. Statistics are stored as JSON-serialized values along with + * audit information. + */ +public class MysqlPartitionStatisticStorage implements PartitionStatisticStorage { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlPartitionStatisticStorage.class); + + private final DataSource dataSource; + private final EntityStore entityStore; + + // SQL statements + private static final String INSERT_OR_UPDATE_SQL = + "INSERT INTO partition_statistic_meta " + + "(table_id, partition_name, statistic_name, statistic_value, audit_info, created_at, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "statistic_value = VALUES(statistic_value), " + + "audit_info = VALUES(audit_info), " + + "updated_at = VALUES(updated_at)"; + + private static final String SELECT_STATISTICS_SQL = + "SELECT partition_name, statistic_name, statistic_value, audit_info " + + "FROM partition_statistic_meta " + + "WHERE table_id = ? "; + + private static final String DELETE_STATISTICS_SQL = + "DELETE FROM partition_statistic_meta " + + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?"; + + /** + * Constructs a new MysqlPartitionStatisticStorage. + * + * @param dataSource the JDBC DataSource for MySQL connections + */ + public MysqlPartitionStatisticStorage(DataSource dataSource) { + this.dataSource = dataSource; + this.entityStore = GravitinoEnv.getInstance().entityStore(); + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, PartitionRange partitionRange) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, range: {}", + metalake, + metadataObject.fullName(), + partitionRange); + + Long tableId = resolveTableId(metalake, metadataObject); + String rangeFilter = buildPartitionRangeFilter(partitionRange); + + String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); Review Comment: It seems that paramIndex is always 1? ########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorage.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.stats.PartitionRange; +import org.apache.gravitino.stats.PartitionStatisticsDrop; +import org.apache.gravitino.stats.PartitionStatisticsUpdate; +import org.apache.gravitino.stats.StatisticValue; +import org.apache.gravitino.utils.MetadataObjectUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-based implementation of {@link PartitionStatisticStorage}. + * + * <p>This implementation stores partition statistics in a MySQL database table, using Apache + * Commons DBCP2 for connection pooling. Statistics are stored as JSON-serialized values along with + * audit information. + */ +public class MysqlPartitionStatisticStorage implements PartitionStatisticStorage { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlPartitionStatisticStorage.class); + + private final DataSource dataSource; + private final EntityStore entityStore; + + // SQL statements + private static final String INSERT_OR_UPDATE_SQL = + "INSERT INTO partition_statistic_meta " + + "(table_id, partition_name, statistic_name, statistic_value, audit_info, created_at, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "statistic_value = VALUES(statistic_value), " + + "audit_info = VALUES(audit_info), " + + "updated_at = VALUES(updated_at)"; + + private static final String SELECT_STATISTICS_SQL = + "SELECT partition_name, statistic_name, statistic_value, audit_info " + + "FROM partition_statistic_meta " + + "WHERE table_id = ? "; + + private static final String DELETE_STATISTICS_SQL = + "DELETE FROM partition_statistic_meta " + + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?"; + + /** + * Constructs a new MysqlPartitionStatisticStorage. + * + * @param dataSource the JDBC DataSource for MySQL connections + */ + public MysqlPartitionStatisticStorage(DataSource dataSource) { + this.dataSource = dataSource; + this.entityStore = GravitinoEnv.getInstance().entityStore(); + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, PartitionRange partitionRange) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, range: {}", + metalake, + metadataObject.fullName(), + partitionRange); + + Long tableId = resolveTableId(metalake, metadataObject); + String rangeFilter = buildPartitionRangeFilter(partitionRange); + + String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); + + // Set partition range parameters + setPartitionRangeParameters(stmt, partitionRange, paramIndex); + + try (ResultSet rs = stmt.executeQuery()) { + return parseResultSet(rs); + } + + } catch (SQLException e) { + throw new IOException("Failed to list statistics for " + metadataObject.fullName(), e); + } + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, List<String> partitionNames) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, partitions: {}", + metalake, + metadataObject.fullName(), + partitionNames); + + if (partitionNames.isEmpty()) { + return Lists.newArrayList(); + } + + Long tableId = resolveTableId(metalake, metadataObject); + + // Build IN clause + String inClause = + partitionNames.stream().map(p -> "?").collect(Collectors.joining(", ", "(", ")")); + String sql = + SELECT_STATISTICS_SQL + + "AND partition_name IN " + + inClause + + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); + + for (String partitionName : partitionNames) { + stmt.setString(paramIndex++, partitionName); + } + + try (ResultSet rs = stmt.executeQuery()) { + return parseResultSet(rs); + } + + } catch (SQLException e) { + throw new IOException( + "Failed to list statistics for partitions " + + partitionNames + + " in " + + metadataObject.fullName(), + e); + } + } + + @Override + public int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop> statisticsToDrop) + throws IOException { + LOG.debug( + "Dropping statistics for metalake: {}, {} objects", metalake, statisticsToDrop.size()); + + int totalDropped = 0; + + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(false); + + try (PreparedStatement stmt = conn.prepareStatement(DELETE_STATISTICS_SQL)) { + for (MetadataObjectStatisticsDrop objectDrop : statisticsToDrop) { + Long tableId = resolveTableId(metalake, objectDrop.metadataObject()); + + for (PartitionStatisticsDrop drop : objectDrop.drops()) { + String partitionName = drop.partitionName(); + for (String statisticName : drop.statisticNames()) { + stmt.setLong(1, tableId); + stmt.setString(2, partitionName); + stmt.setString(3, statisticName); + stmt.addBatch(); + } + } + } + + int[] results = stmt.executeBatch(); + for (int result : results) { + if (result > 0 || result == PreparedStatement.SUCCESS_NO_INFO) { + totalDropped++; Review Comment: Can you give me the docs about why we need to check that the result is not `PreparedStatement.SUCCESS_NO_INFO`? ########## core/src/main/java/org/apache/gravitino/stats/storage/MysqlPartitionStatisticStorage.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.stats.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.stats.PartitionRange; +import org.apache.gravitino.stats.PartitionStatisticsDrop; +import org.apache.gravitino.stats.PartitionStatisticsUpdate; +import org.apache.gravitino.stats.StatisticValue; +import org.apache.gravitino.utils.MetadataObjectUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-based implementation of {@link PartitionStatisticStorage}. + * + * <p>This implementation stores partition statistics in a MySQL database table, using Apache + * Commons DBCP2 for connection pooling. Statistics are stored as JSON-serialized values along with + * audit information. + */ +public class MysqlPartitionStatisticStorage implements PartitionStatisticStorage { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlPartitionStatisticStorage.class); + + private final DataSource dataSource; + private final EntityStore entityStore; + + // SQL statements + private static final String INSERT_OR_UPDATE_SQL = + "INSERT INTO partition_statistic_meta " + + "(table_id, partition_name, statistic_name, statistic_value, audit_info, created_at, updated_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "statistic_value = VALUES(statistic_value), " + + "audit_info = VALUES(audit_info), " + + "updated_at = VALUES(updated_at)"; + + private static final String SELECT_STATISTICS_SQL = + "SELECT partition_name, statistic_name, statistic_value, audit_info " + + "FROM partition_statistic_meta " + + "WHERE table_id = ? "; + + private static final String DELETE_STATISTICS_SQL = + "DELETE FROM partition_statistic_meta " + + "WHERE table_id = ? AND partition_name = ? AND statistic_name = ?"; + + /** + * Constructs a new MysqlPartitionStatisticStorage. + * + * @param dataSource the JDBC DataSource for MySQL connections + */ + public MysqlPartitionStatisticStorage(DataSource dataSource) { + this.dataSource = dataSource; + this.entityStore = GravitinoEnv.getInstance().entityStore(); + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, PartitionRange partitionRange) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, range: {}", + metalake, + metadataObject.fullName(), + partitionRange); + + Long tableId = resolveTableId(metalake, metadataObject); + String rangeFilter = buildPartitionRangeFilter(partitionRange); + + String sql = SELECT_STATISTICS_SQL + rangeFilter + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); + + // Set partition range parameters + setPartitionRangeParameters(stmt, partitionRange, paramIndex); + + try (ResultSet rs = stmt.executeQuery()) { + return parseResultSet(rs); + } + + } catch (SQLException e) { + throw new IOException("Failed to list statistics for " + metadataObject.fullName(), e); + } + } + + @Override + public List<PersistedPartitionStatistics> listStatistics( + String metalake, MetadataObject metadataObject, List<String> partitionNames) + throws IOException { + LOG.debug( + "Listing statistics for metalake: {}, object: {}, partitions: {}", + metalake, + metadataObject.fullName(), + partitionNames); + + if (partitionNames.isEmpty()) { + return Lists.newArrayList(); + } + + Long tableId = resolveTableId(metalake, metadataObject); + + // Build IN clause + String inClause = + partitionNames.stream().map(p -> "?").collect(Collectors.joining(", ", "(", ")")); + String sql = + SELECT_STATISTICS_SQL + + "AND partition_name IN " + + inClause + + " ORDER BY partition_name, statistic_name"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + + int paramIndex = 1; + stmt.setLong(paramIndex++, tableId); + + for (String partitionName : partitionNames) { + stmt.setString(paramIndex++, partitionName); + } + + try (ResultSet rs = stmt.executeQuery()) { + return parseResultSet(rs); + } + + } catch (SQLException e) { + throw new IOException( + "Failed to list statistics for partitions " + + partitionNames + + " in " + + metadataObject.fullName(), + e); + } + } + + @Override + public int dropStatistics(String metalake, List<MetadataObjectStatisticsDrop> statisticsToDrop) + throws IOException { + LOG.debug( + "Dropping statistics for metalake: {}, {} objects", metalake, statisticsToDrop.size()); + + int totalDropped = 0; + + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(false); + + try (PreparedStatement stmt = conn.prepareStatement(DELETE_STATISTICS_SQL)) { + for (MetadataObjectStatisticsDrop objectDrop : statisticsToDrop) { + Long tableId = resolveTableId(metalake, objectDrop.metadataObject()); + + for (PartitionStatisticsDrop drop : objectDrop.drops()) { + String partitionName = drop.partitionName(); + for (String statisticName : drop.statisticNames()) { + stmt.setLong(1, tableId); + stmt.setString(2, partitionName); + stmt.setString(3, statisticName); + stmt.addBatch(); + } + } + } + + int[] results = stmt.executeBatch(); + for (int result : results) { + if (result > 0 || result == PreparedStatement.SUCCESS_NO_INFO) { + totalDropped++; + } + } + + conn.commit(); + LOG.debug("Successfully dropped {} statistics", totalDropped); + + } catch (Exception e) { + conn.rollback(); + throw e; + } finally { + conn.setAutoCommit(true); + } + + } catch (SQLException e) { + throw new IOException("Failed to drop statistics", e); + } + + return totalDropped; + } + + @Override + public void updateStatistics( + String metalake, List<MetadataObjectStatisticsUpdate> statisticsToUpdate) throws IOException { + LOG.debug( + "Updating statistics for metalake: {}, {} objects", metalake, statisticsToUpdate.size()); + + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(false); + + try (PreparedStatement stmt = conn.prepareStatement(INSERT_OR_UPDATE_SQL)) { + for (MetadataObjectStatisticsUpdate objectUpdate : statisticsToUpdate) { + Long tableId = resolveTableId(metalake, objectUpdate.metadataObject()); + + for (PartitionStatisticsUpdate update : objectUpdate.partitionUpdates()) { + String partitionName = update.partitionName(); + + for (Map.Entry<String, StatisticValue<?>> stat : update.statistics().entrySet()) { + String statisticName = stat.getKey(); + StatisticValue<?> statisticValue = stat.getValue(); + + // Create audit info + String currentUser = PrincipalUtils.getCurrentUserName(); + Instant now = Instant.now(); + AuditInfo auditInfo = + AuditInfo.builder() + .withCreator(currentUser) + .withCreateTime(now) + .withLastModifier(currentUser) + .withLastModifiedTime(now) + .build(); + + // Serialize to JSON + String statisticValueJson = + JsonUtils.anyFieldMapper().writeValueAsString(statisticValue); + String auditInfoJson = JsonUtils.anyFieldMapper().writeValueAsString(auditInfo); + + long timestamp = now.toEpochMilli(); + + stmt.setLong(1, tableId); + stmt.setString(2, partitionName); + stmt.setString(3, statisticName); + stmt.setString(4, statisticValueJson); + stmt.setString(5, auditInfoJson); + stmt.setLong(6, timestamp); + stmt.setLong(7, timestamp); + + stmt.addBatch(); + } + } + } + + stmt.executeBatch(); + conn.commit(); + LOG.debug("Successfully updated statistics"); + + } catch (Exception e) { + conn.rollback(); + throw e; + } finally { + conn.setAutoCommit(true); + } + + } catch (SQLException | JsonProcessingException e) { + throw new IOException("Failed to update statistics", e); + } + } + + @Override + public void close() throws IOException { + // DataSource lifecycle is managed externally by the factory + LOG.debug("Closing MysqlPartitionStatisticStorage"); + } + + /** + * Resolves the table ID for a given metadata object. + * + * @param metalake the metalake name + * @param metadataObject the metadata object + * @return the table ID + * @throws IOException if unable to resolve the table ID + */ + private Long resolveTableId(String metalake, MetadataObject metadataObject) throws IOException { + NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, metadataObject); + Entity.EntityType type = MetadataObjectUtil.toEntityType(metadataObject); + + TableEntity tableEntity = entityStore.get(identifier, type, TableEntity.class); + return tableEntity.id(); + } + + /** + * Builds the SQL filter clause for partition range. + * + * @param range the partition range + * @return the SQL filter clause + */ + private String buildPartitionRangeFilter(PartitionRange range) { + StringBuilder filter = new StringBuilder(); + + range + .lowerPartitionName() + .ifPresent( + name -> { + String op = + range + .lowerBoundType() + .map(t -> t == PartitionRange.BoundType.CLOSED ? ">=" : ">") + .orElse(">="); + filter.append("AND partition_name ").append(op).append(" ? "); + }); + + range + .upperPartitionName() + .ifPresent( + name -> { + String op = + range + .upperBoundType() + .map(t -> t == PartitionRange.BoundType.CLOSED ? "<=" : "<") + .orElse("<="); + filter.append("AND partition_name ").append(op).append(" ? "); + }); + + return filter.toString(); + } + + /** + * Sets partition range parameters in the prepared statement. + * + * @param stmt the prepared statement + * @param range the partition range + * @param startIndex the starting parameter index + * @return the next parameter index + * @throws SQLException if setting parameters fails + */ + private int setPartitionRangeParameters( Review Comment: It seems that the return value is never used, can you try to optimize it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
