This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 8cdfda2c6e0e542e77b2fd7307b7664fccb4a708 Author: Jarvis <[email protected]> AuthorDate: Wed Jul 30 17:01:33 2025 +0800 [#3302][Sub-Task] StarRocks catalog Partition ops (#7791) <!-- 1. Title: [#<issue>] <type>(<scope>): <subject> Examples: - "[#123] feat(operator): support xxx" - "[#233] fix: check null before access result in xxx" - "[MINOR] refactor: fix typo in variable name" - "[MINOR] docs: fix typo in README" - "[#255] test: fix flaky test NameOfTheTest" Reference: https://www.conventionalcommits.org/en/v1.0.0/ 2. If the PR is unfinished, please mark this PR as draft. --> ### What changes were proposed in this pull request? add StarRocks Catalog Implement ### Why are the changes needed? To support StarRocks Catalog. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By E2E test, the test is in another pr https://github.com/apache/gravitino/pull/7792 --- .../StarRocksColumnDefaultValueConverter.java | 2 +- .../StarRocksTablePartitionOperations.java | 176 ++++++++++++++++++++- 2 files changed, 169 insertions(+), 9 deletions(-) diff --git a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java index a9f8c5bd86..d449f29b1a 100644 --- a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java +++ b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java @@ -62,7 +62,7 @@ public class StarRocksColumnDefaultValueConverter extends JdbcColumnDefaultValue if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) { return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; } - // The parsing of Doris expressions is complex, so we are not currently undertaking the + // The parsing of StarRocks expressions is complex, so we are not currently undertaking the // parsing. return UnparsedExpression.of(columnDefaultValue); } diff --git a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java index 97d038b547..61d534f916 100644 --- a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java +++ b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java @@ -20,23 +20,46 @@ package org.apache.gravitino.catalog.starrocks.operations; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; import javax.sql.DataSource; -import org.apache.commons.lang3.NotImplementedException; import org.apache.gravitino.catalog.jdbc.JdbcTable; import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; import org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations; +import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.exceptions.NoSuchPartitionException; import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.partitions.ListPartition; import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.apache.gravitino.rel.partitions.RangePartition; +import org.apache.gravitino.rel.types.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Table partition operations for StarRocks. */ public final class StarRocksTablePartitionOperations extends JdbcTablePartitionOperations { - @SuppressWarnings("unused") + private static final Logger log = + LoggerFactory.getLogger(StarRocksTablePartitionOperations.class); + private final JdbcExceptionConverter exceptionConverter; - @SuppressWarnings("unused") private final JdbcTypeConverter typeConverter; public StarRocksTablePartitionOperations( @@ -53,26 +76,163 @@ public final class StarRocksTablePartitionOperations extends JdbcTablePartitionO @Override public String[] listPartitionNames() { - throw new NotImplementedException("To be implemented in the future"); + try (Connection connection = getConnection(loadedTable.databaseName())) { + String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", loadedTable.name()); + try (Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(showPartitionsSql)) { + ImmutableList.Builder<String> partitionNames = ImmutableList.builder(); + while (result.next()) { + partitionNames.add(result.getString("PartitionName")); + } + return partitionNames.build().toArray(new String[0]); + } + } catch (SQLException e) { + throw exceptionConverter.toGravitinoException(e); + } } @Override public Partition[] listPartitions() { - throw new NotImplementedException("To be implemented in the future"); + try (Connection connection = getConnection(loadedTable.databaseName())) { + Transform partitionInfo = loadedTable.partitioning()[0]; + Map<String, Type> columnTypes = getColumnType(connection); + String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", loadedTable.name()); + try (Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(showPartitionsSql)) { + ImmutableList.Builder<Partition> partitions = ImmutableList.builder(); + while (result.next()) { + partitions.add( + StarRocksUtils.fromStarRocksPartition( + loadedTable.name(), result, partitionInfo, columnTypes)); + } + return partitions.build().toArray(new Partition[0]); + } + } catch (SQLException e) { + throw exceptionConverter.toGravitinoException(e); + } } @Override public Partition getPartition(String partitionName) throws NoSuchPartitionException { - throw new NotImplementedException("To be implemented in the future"); + try (Connection connection = getConnection(loadedTable.databaseName())) { + Transform partitionInfo = loadedTable.partitioning()[0]; + Map<String, Type> columnTypes = getColumnType(connection); + String showPartitionsSql = + String.format( + "SHOW PARTITIONS FROM `%s` WHERE PartitionName = \"%s\"", + loadedTable.name(), partitionName); + try (Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(showPartitionsSql)) { + if (result.next()) { + return StarRocksUtils.fromStarRocksPartition( + loadedTable.name(), result, partitionInfo, columnTypes); + } + } + } catch (SQLException e) { + throw exceptionConverter.toGravitinoException(e); + } + throw new NoSuchPartitionException("Partition %s does not exist", partitionName); } @Override public Partition addPartition(Partition partition) throws PartitionAlreadyExistsException { - throw new NotImplementedException("To be implemented in the future"); + try (Connection connection = getConnection(loadedTable.databaseName())) { + Transform partitionInfo = loadedTable.partitioning()[0]; + + String addPartitionSqlFormat = "ALTER TABLE `%s` ADD %s"; + String partitionSqlFragment; + Partition added; + + if (partition instanceof RangePartition) { + Preconditions.checkArgument( + partitionInfo instanceof Transforms.RangeTransform, + "Table %s is non-range-partitioned, but trying to add a range partition", + loadedTable.name()); + + RangePartition rangePartition = (RangePartition) partition; + partitionSqlFragment = StarRocksUtils.generatePartitionSqlFragment(rangePartition); + + // The partition properties actually cannot be passed into StarRocks, we just return an + // empty + // map instead. + added = + Partitions.range( + rangePartition.name(), + rangePartition.upper(), + rangePartition.lower(), + Collections.emptyMap()); + } else if (partition instanceof ListPartition) { + Preconditions.checkArgument( + partitionInfo instanceof Transforms.ListTransform, + "Table %s is non-list-partitioned, but trying to add a list partition", + loadedTable.name()); + + ListPartition listPartition = (ListPartition) partition; + Literal<?>[][] lists = listPartition.lists(); + Preconditions.checkArgument( + lists.length > 0, "The number of values in list partition must be greater than 0"); + Preconditions.checkArgument( + Arrays.stream(lists) + .allMatch( + part -> + part.length + == ((Transforms.ListTransform) partitionInfo).fieldNames().length), + "The number of partitioning columns must be consistent"); + + partitionSqlFragment = StarRocksUtils.generatePartitionSqlFragment(listPartition); + + added = + Partitions.list(listPartition.name(), listPartition.lists(), Collections.emptyMap()); + } else { + throw new IllegalArgumentException("Unsupported partition type of StarRocks"); + } + log.info("Generated add partition sql : {}", partitionSqlFragment); + try (Statement statement = connection.createStatement()) { + statement.executeUpdate( + String.format(addPartitionSqlFormat, loadedTable.name(), partitionSqlFragment)); + return added; + } + } catch (SQLException e) { + throw exceptionConverter.toGravitinoException(e); + } } @Override public boolean dropPartition(String partitionName) { - throw new NotImplementedException("To be implemented in the future"); + try (Connection connection = getConnection(loadedTable.databaseName())) { + String dropPartitionsSql = + String.format("ALTER TABLE `%s` DROP PARTITION `%s`", loadedTable.name(), partitionName); + try (Statement statement = connection.createStatement()) { + statement.executeUpdate(dropPartitionsSql); + return true; + } + } catch (SQLException e) { + GravitinoRuntimeException exception = exceptionConverter.toGravitinoException(e); + if (exception instanceof NoSuchPartitionException) { + return false; + } + throw exception; + } + } + + private Map<String, Type> getColumnType(Connection connection) throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet result = + metaData.getColumns( + connection.getCatalog(), connection.getSchema(), loadedTable.name(), null)) { + ImmutableMap.Builder<String, Type> columnTypes = ImmutableMap.builder(); + while (result.next()) { + if (Objects.equals(result.getString("TABLE_NAME"), loadedTable.name())) { + JdbcTypeConverter.JdbcTypeBean typeBean = + new JdbcTypeConverter.JdbcTypeBean(result.getString("TYPE_NAME")); + typeBean.setColumnSize(result.getInt("COLUMN_SIZE")); + typeBean.setScale(result.getInt("DECIMAL_DIGITS")); + Type gravitinoType = typeConverter.toGravitino(typeBean); + String columnName = result.getString("COLUMN_NAME"); + columnTypes.put(columnName, gravitinoType); + } + } + return columnTypes.build(); + } } }
