This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new b7536aca87 [#10073] feat(catalog-jdbc-hologres): Add table operations
and unit tests for Hologres catalog (#10068)
b7536aca87 is described below
commit b7536aca8725f0410dd58f319f6cbab342298641
Author: Ye Ding <[email protected]>
AuthorDate: Mon Mar 9 11:32:23 2026 +0800
[#10073] feat(catalog-jdbc-hologres): Add table operations and unit tests
for Hologres catalog (#10068)
### What changes were proposed in this pull request?
Replace the stub `HologresTableOperations` with the full implementation,
and add `TestHologresTableOperations`:
**Table Operations:**
- `generateCreateTableSql()`: Full DDL generation with column types,
nullability, defaults, comments, primary keys, distribution keys
(`DISTRIBUTE BY`), partition keys (`PARTITION BY`), and table properties
(`orientation`, `table_group`, `time_to_live_in_seconds`,
`binlog_level`, `bitmap_columns`, `dictionary_encoding_columns`,
`clustering_key`, `segment_key`, `event_time_column`)
- `generateAlterTableSql()`: Support for rename table,
add/drop/rename/update-type column, update comment, set/remove
properties
- `generatePurgeTableSql()`: DROP TABLE implementation
- Table loading with metadata reconstruction from JDBC metadata and
`pg_catalog` queries
**Unit Tests (835 lines):**
- CREATE TABLE with various column types and constraints
- ALTER TABLE for all supported operations
- Table property handling (orientation, TTL, binlog, etc.)
- Distribution and partition key support
### Why are the changes needed?
Enables Gravitino to fully manage Hologres tables with all
Hologres-specific features.
Fix: https://github.com/apache/gravitino/issues/10073
### Does this PR introduce _any_ user-facing change?
No. The Hologres catalog is not yet documented or released.
### How was this patch tested?
- Unit tests: `./gradlew :catalogs-contrib:catalog-jdbc-hologres:test
-PskipITs` — all tests pass.
---
.../operation/HologresSchemaOperations.java | 12 +-
.../operation/HologresTableOperations.java | 982 +++++++++++++++++-
.../operation/TestHologresTableOperations.java | 1084 ++++++++++++++++++++
3 files changed, 2059 insertions(+), 19 deletions(-)
diff --git
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
index 3622f53f2f..2790287392 100644
---
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
+++
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
@@ -18,8 +18,6 @@
*/
package org.apache.gravitino.catalog.hologres.operation;
-import static
org.apache.gravitino.catalog.hologres.operation.HologresTableOperations.HOLO_QUOTE;
-
import com.google.common.collect.ImmutableSet;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -103,13 +101,10 @@ public class HologresSchemaOperations extends
JdbcDatabaseOperations {
"Hologres does not support properties on schema create.");
}
- StringBuilder sqlBuilder =
- new StringBuilder(String.format("CREATE SCHEMA %s%s%s;", HOLO_QUOTE,
schema, HOLO_QUOTE));
+ StringBuilder sqlBuilder = new StringBuilder(String.format("CREATE SCHEMA
\"%s\";", schema));
if (StringUtils.isNotEmpty(comment)) {
String escapedComment = comment.replace("'", "''");
- sqlBuilder.append(
- String.format(
- "COMMENT ON SCHEMA %s%s%s IS '%s'", HOLO_QUOTE, schema,
HOLO_QUOTE, escapedComment));
+ sqlBuilder.append(String.format("COMMENT ON SCHEMA \"%s\" IS '%s'",
schema, escapedComment));
}
return sqlBuilder.toString();
}
@@ -133,8 +128,7 @@ public class HologresSchemaOperations extends
JdbcDatabaseOperations {
@Override
public String generateDropDatabaseSql(String schema, boolean cascade) {
- StringBuilder sqlBuilder =
- new StringBuilder(String.format("DROP SCHEMA %s%s%s", HOLO_QUOTE,
schema, HOLO_QUOTE));
+ StringBuilder sqlBuilder = new StringBuilder(String.format("DROP SCHEMA
\"%s\"", schema));
if (cascade) {
sqlBuilder.append(" CASCADE");
}
diff --git
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
index 5beb9c0178..8b244880e0 100644
---
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
+++
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
@@ -18,14 +18,48 @@
*/
package org.apache.gravitino.catalog.hologres.operation;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation;
import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
/**
@@ -34,17 +68,119 @@ import org.apache.gravitino.rel.indexes.Index;
* <p>Hologres is PostgreSQL-compatible, so most table operations follow
PostgreSQL conventions.
* However, Hologres has specific features like table properties (orientation,
distribution_key,
* etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
*/
public class HologresTableOperations extends JdbcTableOperations
implements RequireDatabaseOperation {
- public static final String HOLO_QUOTE = "\"";
+ public static final String NEW_LINE = "\n";
+ public static final String ALTER_TABLE = "ALTER TABLE ";
+
+ private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+ "Hologres does not support nested column names.";
+
+ /** Properties that are handled separately or read-only, excluded from the
WITH clause. */
+ private static final Set<String> EXCLUDED_TABLE_PROPERTIES =
+ ImmutableSet.of("distribution_key", "is_logical_partitioned_table",
"primary_key");
+
+ /** Properties that are meaningful for users, filtering out internal system
properties. */
+ private static final Set<String> USER_RELEVANT_PROPERTIES =
+ ImmutableSet.of(
+ "orientation",
+ "clustering_key",
+ "segment_key",
+ "bitmap_columns",
+ "dictionary_encoding_columns",
+ "time_to_live_in_seconds",
+ "table_group",
+ "storage_format",
+ "binlog.level",
+ "binlog.ttl",
+ "is_logical_partitioned_table",
+ "partition_expiration_time",
+ "partition_keep_hot_window",
+ "partition_require_filter",
+ "partition_generate_binlog_window");
+
+ private String database;
+ private HologresSchemaOperations schemaOperations;
+
+ @Override
+ protected String quoteIdentifier(String identifier) {
+ return "\"" + identifier.replace("\"", "\"\"") + "\"";
+ }
+
+ @Override
+ public void initialize(
+ DataSource dataSource,
+ JdbcExceptionConverter exceptionMapper,
+ JdbcTypeConverter jdbcTypeConverter,
+ JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+ Map<String, String> conf) {
+ super.initialize(
+ dataSource, exceptionMapper, jdbcTypeConverter,
jdbcColumnDefaultValueConverter, conf);
+ database = new JdbcConfig(conf).getJdbcDatabase();
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(database),
+ "The `jdbc-database` configuration item is mandatory in Hologres.");
+ }
@Override
public void setDatabaseOperation(DatabaseOperation databaseOperation) {
- // Will be implemented in a follow-up PR.
+ this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+ }
+
+ @Override
+ public List<String> listTables(String schemaName) throws
NoSuchSchemaException {
+ try (Connection connection = getConnection(schemaName)) {
+ if (!schemaOperations.schemaExists(connection, schemaName)) {
+ throw new NoSuchSchemaException("No such schema: %s", schemaName);
+ }
+ final List<String> names = Lists.newArrayList();
+ try (ResultSet tables = getTables(connection)) {
+ while (tables.next()) {
+ if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+ names.add(tables.getString("TABLE_NAME"));
+ }
+ }
+ }
+ LOG.debug("Finished listing tables size {} for schema name {} ",
names.size(), schemaName);
+ return names;
+ } catch (final SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
+ }
+ }
+
+ @Override
+ protected JdbcTable.Builder getTableBuilder(
+ ResultSet tablesResult, String databaseName, String tableName) throws
SQLException {
+ boolean found = false;
+ JdbcTable.Builder builder = null;
+ while (tablesResult.next() && !found) {
+ String tableNameInResult = tablesResult.getString("TABLE_NAME");
+ String tableSchemaInResult = tablesResult.getString("TABLE_SCHEM");
+ if (Objects.equals(tableNameInResult, tableName)
+ && Objects.equals(tableSchemaInResult, databaseName)) {
+ builder = getBasicJdbcTableInfo(tablesResult);
+ found = true;
+ }
+ }
+
+ if (!found) {
+ throw new NoSuchTableException("Table %s does not exist in %s.",
tableName, databaseName);
+ }
+
+ return builder;
+ }
+
+ @Override
+ protected JdbcColumn.Builder getColumnBuilder(
+ ResultSet columnsResult, String databaseName, String tableName) throws
SQLException {
+ JdbcColumn.Builder builder = null;
+ if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)
+ && Objects.equals(columnsResult.getString("TABLE_SCHEM"),
databaseName)) {
+ builder = getBasicJdbcColumnInfo(columnsResult);
+ }
+ return builder;
}
@Override
@@ -56,15 +192,235 @@ public class HologresTableOperations extends
JdbcTableOperations
Transform[] partitioning,
Distribution distribution,
Index[] indexes) {
- throw new UnsupportedOperationException(
- "Hologres table creation will be implemented in a follow-up PR.");
+ boolean isLogicalPartition =
+ MapUtils.isNotEmpty(properties)
+ &&
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(String.format("CREATE TABLE %s (%s",
quoteIdentifier(tableName), NEW_LINE));
+
+ // Add columns
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(String.format(" %s",
quoteIdentifier(column.name())));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(String.format(",%s", NEW_LINE));
+ }
+ }
+ appendIndexesSql(indexes, sqlBuilder);
+ sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+ // Append partitioning clause if specified
+ if (ArrayUtils.isNotEmpty(partitioning)) {
+ appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+ }
+
+ // Build WITH clause combining distribution and Hologres-specific table
properties
+ // Supported properties: orientation, distribution_key, clustering_key,
event_time_column,
+ // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds,
table_group, etc.
+ List<String> withEntries = new ArrayList<>();
+
+ // Add distribution_key from Distribution parameter
+ if (!Distributions.NONE.equals(distribution)) {
+ validateDistribution(distribution);
+ String distributionColumns =
+ Arrays.stream(distribution.expressions())
+ .map(
+ expression -> {
+ Preconditions.checkArgument(
+ expression instanceof NamedReference,
+ "Hologres distribution expressions must be simple
column references");
+ String[] fieldNames = ((NamedReference)
expression).fieldName();
+ Preconditions.checkArgument(
+ fieldNames != null && fieldNames.length == 1,
+ "Hologres distribution expressions must reference a
single column");
+ return fieldNames[0];
+ })
+ .collect(Collectors.joining(","));
+ withEntries.add(String.format("distribution_key = '%s'",
distributionColumns));
+ }
+
+ // Add user-specified properties (filter out read-only /
internally-handled properties)
+ if (MapUtils.isNotEmpty(properties)) {
+ properties.forEach(
+ (key, value) -> {
+ if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+ Preconditions.checkArgument(
+ key.matches("[a-zA-Z_][a-zA-Z0-9_]*"),
+ "Invalid property key: %s. Property key must be a valid
identifier.",
+ key);
+ withEntries.add(String.format("%s = '%s'", key,
value.replace("'", "''")));
+ }
+ });
+ }
+
+ // Generate WITH clause
+ if (!withEntries.isEmpty()) {
+ sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+ sqlBuilder.append(
+ withEntries.stream()
+ .map(entry -> String.format(" %s", entry))
+ .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+ sqlBuilder.append(String.format("%s)", NEW_LINE));
+ }
+
+ sqlBuilder.append(";");
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder
+ .append(NEW_LINE)
+ .append(
+ String.format(
+ "COMMENT ON TABLE %s IS '%s';", quoteIdentifier(tableName),
escapedComment));
+ }
+ Arrays.stream(columns)
+ .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+ .forEach(
+ jdbcColumn -> {
+ String escapedColComment = jdbcColumn.comment().replace("'",
"''");
+ sqlBuilder
+ .append(NEW_LINE)
+ .append(
+ String.format(
+ "COMMENT ON COLUMN %s.%s IS '%s';",
+ quoteIdentifier(tableName),
+ quoteIdentifier(jdbcColumn.name()),
+ escapedColComment));
+ });
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.debug("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ @VisibleForTesting
+ static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+ for (Index index : indexes) {
+ String fieldStr = getIndexFieldStr(index.fieldNames());
+ sqlBuilder.append(String.format(",%s", NEW_LINE));
+ switch (index.type()) {
+ case PRIMARY_KEY:
+ sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Hologres only supports PRIMARY_KEY index, but got: " +
index.type());
+ }
+ }
+ }
+
+ protected static String getIndexFieldStr(String[][] fieldNames) {
+ return Arrays.stream(fieldNames)
+ .map(
+ colNames -> {
+ if (colNames.length > 1) {
+ throw new IllegalArgumentException(
+ "Index does not support complex fields in Hologres");
+ }
+ return "\"" + colNames[0] + "\"";
+ })
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Append the partitioning clause to the CREATE TABLE SQL.
+ *
+ * <p>Hologres supports two types of partition tables:
+ *
+ * <ul>
+ * <li>Physical partition table: uses {@code PARTITION BY LIST(column)}
syntax
+ * <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY
LIST(column1[,
+ * column2])} syntax
+ * </ul>
+ *
+ * @param partitioning the partition transforms (only LIST partitioning is
supported)
+ * @param isLogicalPartition whether to create a logical partition table
+ * @param sqlBuilder the SQL builder to append to
+ */
+ @VisibleForTesting
+ static void appendPartitioningSql(
+ Transform[] partitioning, boolean isLogicalPartition, StringBuilder
sqlBuilder) {
+ Preconditions.checkArgument(
+ partitioning.length == 1,
+ "Hologres only supports single partition transform, but got %s",
+ partitioning.length);
+ Preconditions.checkArgument(
+ partitioning[0] instanceof Transforms.ListTransform,
+ "Hologres only supports LIST partitioning, but got %s",
+ partitioning[0].getClass().getSimpleName());
+
+ Transforms.ListTransform listTransform = (Transforms.ListTransform)
partitioning[0];
+ String[][] fieldNames = listTransform.fieldNames();
+
+ Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must
not be empty");
+
+ if (isLogicalPartition) {
+ Preconditions.checkArgument(
+ fieldNames.length <= 2,
+ "Logical partition table supports at most 2 partition columns, but
got: %s",
+ fieldNames.length);
+ } else {
+ Preconditions.checkArgument(
+ fieldNames.length == 1,
+ "Physical partition table supports exactly 1 partition column, but
got: %s",
+ fieldNames.length);
+ }
+
+ String partitionColumns =
+ Arrays.stream(fieldNames)
+ .map(
+ colNames -> {
+ Preconditions.checkArgument(
+ colNames.length == 1,
+ "Hologres partition does not support nested field
names");
+ return "\"" + colNames[0] + "\"";
+ })
+ .collect(Collectors.joining(", "));
+
+ sqlBuilder.append(NEW_LINE);
+ if (isLogicalPartition) {
+ sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)",
partitionColumns));
+ } else {
+ sqlBuilder.append(String.format("PARTITION BY LIST(%s)",
partitionColumns));
+ }
+ }
+
+ private void appendColumnDefinition(JdbcColumn column, StringBuilder
sqlBuilder) {
+ // Add data type
+
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+ // Hologres does not support auto-increment columns via Gravitino
+ if (column.autoIncrement()) {
+ throw new IllegalArgumentException(
+ "Hologres does not support creating auto-increment columns via
Gravitino, column: "
+ + column.name());
+ }
+
+ // Add NULL / NOT NULL constraint
+ if (column.nullable()) {
+ sqlBuilder.append("NULL ");
+ } else {
+ sqlBuilder.append("NOT NULL ");
+ }
+ // Add DEFAULT value if specified
+ appendDefaultValue(column, sqlBuilder);
}
@Override
- protected String generateAlterTableSql(
- String schemaName, String tableName, TableChange... changes) {
- throw new UnsupportedOperationException(
- "Hologres table alteration will be implemented in a follow-up PR.");
+ protected String generateRenameTableSql(String oldTableName, String
newTableName) {
+ return String.format(
+ "%s%s RENAME TO %s",
+ ALTER_TABLE, quoteIdentifier(oldTableName),
quoteIdentifier(newTableName));
+ }
+
+ @Override
+ protected String generateDropTableSql(String tableName) {
+ return String.format("DROP TABLE %s", quoteIdentifier(tableName));
}
@Override
@@ -72,4 +428,610 @@ public class HologresTableOperations extends
JdbcTableOperations
throw new UnsupportedOperationException(
"Hologres does not support purge table in Gravitino, please use drop
table");
}
+
+ @Override
+ protected String generateAlterTableSql(
+ String schemaName, String tableName, TableChange... changes) {
+ // Not all operations require the original table information, so lazy
loading is used here
+ JdbcTable lazyLoadTable = null;
+ List<String> alterSql = new ArrayList<>();
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.UpdateComment) {
+ lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+ alterSql.add(updateCommentDefinition((TableChange.UpdateComment)
change, lazyLoadTable));
+ } else if (change instanceof TableChange.SetProperty) {
+ throw new IllegalArgumentException("Set property is not supported
yet");
+ } else if (change instanceof TableChange.RemoveProperty) {
+ throw new IllegalArgumentException("Remove property is not supported
yet");
+ } else if (change instanceof TableChange.AddColumn) {
+ TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+ lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+ alterSql.addAll(addColumnFieldDefinition(addColumn, lazyLoadTable));
+ } else if (change instanceof TableChange.RenameColumn) {
+ TableChange.RenameColumn renameColumn = (TableChange.RenameColumn)
change;
+ alterSql.add(renameColumnFieldDefinition(renameColumn, tableName));
+ } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+ throw new IllegalArgumentException(
+ "Hologres does not support altering column default value via ALTER
TABLE.");
+ } else if (change instanceof TableChange.UpdateColumnType) {
+ throw new IllegalArgumentException(
+ "Hologres does not support altering column type via ALTER TABLE.");
+ } else if (change instanceof TableChange.UpdateColumnComment) {
+ alterSql.add(
+ updateColumnCommentFieldDefinition(
+ (TableChange.UpdateColumnComment) change, tableName));
+ } else if (change instanceof TableChange.UpdateColumnPosition) {
+ throw new IllegalArgumentException("Hologres does not support column
position.");
+ } else if (change instanceof TableChange.DeleteColumn) {
+ lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+ TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn)
change;
+ String deleteColSql = deleteColumnFieldDefinition(deleteColumn,
lazyLoadTable);
+ if (StringUtils.isNotEmpty(deleteColSql)) {
+ alterSql.add(deleteColSql);
+ }
+ } else if (change instanceof TableChange.UpdateColumnNullability) {
+ throw new IllegalArgumentException(
+ "Hologres does not support altering column nullability via ALTER
TABLE.");
+ } else if (change instanceof TableChange.AddIndex) {
+ throw new IllegalArgumentException(
+ "Hologres does not support adding index via ALTER TABLE.");
+ } else if (change instanceof TableChange.DeleteIndex) {
+ throw new IllegalArgumentException(
+ "Hologres does not support deleting index via ALTER TABLE.");
+ } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+ throw new IllegalArgumentException(
+ "Hologres does not support altering column auto-increment via
ALTER TABLE.");
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change type: " + change.getClass().getName());
+ }
+ }
+
+ // Filter out empty strings and check if there are any actual changes
+ alterSql.removeIf(String::isEmpty);
+ if (alterSql.isEmpty()) {
+ return "";
+ }
+
+ // Return the generated SQL statement
+ String result = String.join("\n", alterSql);
+ LOG.debug("Generated alter table:{}.{} sql: {}", schemaName, tableName,
result);
+ return result;
+ }
+
+ private String updateCommentDefinition(
+ TableChange.UpdateComment updateComment, JdbcTable jdbcTable) {
+ String newComment = updateComment.getNewComment();
+ if (null == StringIdentifier.fromComment(newComment)) {
+ // Detect and add Gravitino id.
+ if (StringUtils.isNotEmpty(jdbcTable.comment())) {
+ StringIdentifier identifier =
StringIdentifier.fromComment(jdbcTable.comment());
+ if (null != identifier) {
+ newComment = StringIdentifier.addToComment(identifier, newComment);
+ }
+ }
+ }
+ return String.format(
+ "COMMENT ON TABLE %s IS '%s';",
+ quoteIdentifier(jdbcTable.name()), newComment.replace("'", "''"));
+ }
+
+ private String deleteColumnFieldDefinition(
+ TableChange.DeleteColumn deleteColumn, JdbcTable table) {
+ if (deleteColumn.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+ String col = deleteColumn.fieldName()[0];
+ boolean colExists =
+ Arrays.stream(table.columns()).anyMatch(s -> StringUtils.equals(col,
s.name()));
+ if (!colExists) {
+ if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+ return "";
+ } else {
+ throw new IllegalArgumentException("Delete column does not exist: " +
col);
+ }
+ }
+ return String.format(
+ "%s%s DROP COLUMN %s;",
+ ALTER_TABLE, quoteIdentifier(table.name()),
quoteIdentifier(deleteColumn.fieldName()[0]));
+ }
+
+ private String renameColumnFieldDefinition(
+ TableChange.RenameColumn renameColumn, String tableName) {
+ if (renameColumn.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+ return String.format(
+ "%s%s RENAME COLUMN %s TO %s;",
+ ALTER_TABLE,
+ quoteIdentifier(tableName),
+ quoteIdentifier(renameColumn.fieldName()[0]),
+ quoteIdentifier(renameColumn.getNewName()));
+ }
+
+ private List<String> addColumnFieldDefinition(
+ TableChange.AddColumn addColumn, JdbcTable lazyLoadTable) {
+ if (addColumn.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+
+ // Hologres does not support setting nullable, default value, or
auto-increment via ADD COLUMN
+ if (!addColumn.isNullable()) {
+ throw new IllegalArgumentException(
+ "Hologres does not support setting NOT NULL constraint when adding a
column via ALTER TABLE.");
+ }
+ if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+ throw new IllegalArgumentException(
+ "Hologres does not support setting default value when adding a
column via ALTER TABLE.");
+ }
+ if (addColumn.isAutoIncrement()) {
+ throw new IllegalArgumentException(
+ "Hologres does not support setting auto-increment when adding a
column via ALTER TABLE.");
+ }
+
+ List<String> result = new ArrayList<>();
+ String col = addColumn.fieldName()[0];
+
+ String columnDefinition =
+ String.format(
+ "%s%s ADD COLUMN %s %s",
+ ALTER_TABLE,
+ quoteIdentifier(lazyLoadTable.name()),
+ quoteIdentifier(col),
+ typeConverter.fromGravitino(addColumn.getDataType()));
+
+ // Append position if available
+ if (!(addColumn.getPosition() instanceof TableChange.Default)) {
+ throw new IllegalArgumentException("Hologres does not support column
position in Gravitino.");
+ }
+ result.add(columnDefinition + ";");
+
+ // Append comment if available
+ if (StringUtils.isNotEmpty(addColumn.getComment())) {
+ String escapedComment = addColumn.getComment().replace("'", "''");
+ result.add(
+ String.format(
+ "COMMENT ON COLUMN %s.%s IS '%s';",
+ quoteIdentifier(lazyLoadTable.name()), quoteIdentifier(col),
escapedComment));
+ }
+ return result;
+ }
+
+ private String updateColumnCommentFieldDefinition(
+ TableChange.UpdateColumnComment updateColumnComment, String tableName) {
+ String newComment = updateColumnComment.getNewComment();
+ if (updateColumnComment.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+ String col = updateColumnComment.fieldName()[0];
+ return String.format(
+ "COMMENT ON COLUMN %s.%s IS '%s';",
+ quoteIdentifier(tableName), quoteIdentifier(col),
newComment.replace("'", "''"));
+ }
+
+ @Override
+ protected ResultSet getIndexInfo(String schemaName, String tableName,
DatabaseMetaData metaData)
+ throws SQLException {
+ return metaData.getIndexInfo(database, schemaName, tableName, false,
false);
+ }
+
+ @Override
+ protected ResultSet getPrimaryKeys(String schemaName, String tableName,
DatabaseMetaData metaData)
+ throws SQLException {
+ return metaData.getPrimaryKeys(database, schemaName, tableName);
+ }
+
+ @Override
+ protected Connection getConnection(String schema) throws SQLException {
+ Connection connection = dataSource.getConnection();
+ connection.setCatalog(database);
+ connection.setSchema(schema);
+ return connection;
+ }
+
+ /**
+ * Get tables from the database including regular tables and partitioned
parent tables.
+ *
+ * <p>In Hologres (PostgreSQL-compatible):
+ *
+ * <ul>
+ * <li>Regular tables and partition child tables have TABLE_TYPE = "TABLE"
+ * <li>Partitioned parent tables have TABLE_TYPE = "PARTITIONED TABLE"
+ * <li>Views have TABLE_TYPE = "VIEW" (excluded from listing)
+ * <li>Foreign tables have TABLE_TYPE = "FOREIGN TABLE" (excluded from
listing)
+ * </ul>
+ *
+ * <p>This method overrides the parent to include regular tables and
partition parent tables, but
+ * excludes views and foreign tables from the table list.
+ *
+ * @param connection the database connection
+ * @return ResultSet containing table metadata
+ * @throws SQLException if a database access error occurs
+ */
+ @Override
+ protected ResultSet getTables(Connection connection) throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ String catalogName = connection.getCatalog();
+ String schemaName = connection.getSchema();
+ // Include "TABLE" (regular tables and partition children),
+ // and "PARTITIONED TABLE" (partition parent tables)
+ // Exclude "VIEW" and "FOREIGN TABLE" to hide views and foreign tables
from Gravitino
+ return metaData.getTables(
+ catalogName, schemaName, null, new String[] {"TABLE", "PARTITIONED
TABLE"});
+ }
+
+ @Override
+ protected ResultSet getTable(Connection connection, String schema, String
tableName)
+ throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ // Include TABLE and PARTITIONED TABLE types
+ // Exclude VIEW and FOREIGN TABLE to hide views and foreign tables from
Gravitino
+ return metaData.getTables(
+ database, schema, tableName, new String[] {"TABLE", "PARTITIONED
TABLE"});
+ }
+
+ @Override
+ protected ResultSet getColumns(Connection connection, String schema, String
tableName)
+ throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ return metaData.getColumns(database, schema, tableName, null);
+ }
+
+ /**
+ * Get distribution information from Hologres system table
hologres.hg_table_properties.
+ *
+ * <p>In Hologres, distribution_key is stored as a table property with the
property_key
+ * "distribution_key" and property_value as comma-separated column names
(e.g., "col1,col2").
+ *
+ * <p>This method queries the system table and returns a HASH distribution
with the specified
+ * columns. Hologres only supports HASH distribution strategy.
+ *
+ * @param connection the database connection
+ * @param databaseName the schema name
+ * @param tableName the table name
+ * @return the distribution info, or {@link Distributions#NONE} if no
distribution_key is set
+ * @throws SQLException if a database access error occurs
+ */
+ @Override
+ protected Distribution getDistributionInfo(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ String schemaName = connection.getSchema();
+ String distributionSql =
+ "SELECT property_value "
+ + "FROM hologres.hg_table_properties "
+ + "WHERE table_namespace = ? AND table_name = ? AND property_key =
'distribution_key'";
+
+ try (PreparedStatement statement =
connection.prepareStatement(distributionSql)) {
+ statement.setString(1, schemaName);
+ statement.setString(2, tableName);
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ String distributionKey = resultSet.getString("property_value");
+ if (StringUtils.isNotEmpty(distributionKey)) {
+ NamedReference[] columns =
+ Arrays.stream(distributionKey.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .map(NamedReference::field)
+ .toArray(NamedReference[]::new);
+ if (columns.length > 0) {
+ return Distributions.hash(0, columns);
+ }
+ }
+ }
+ }
+ }
+ return Distributions.NONE;
+ }
+
+ /**
+ * Validate the distribution for Hologres.
+ *
+ * <p>Hologres only supports HASH distribution strategy.
+ *
+ * @param distribution the distribution to validate
+ */
+ private void validateDistribution(Distribution distribution) {
+ Preconditions.checkArgument(
+ distribution.strategy() == Strategy.HASH,
+ "Hologres only supports HASH distribution strategy, but got: %s",
+ distribution.strategy());
+ Preconditions.checkArgument(
+ distribution.expressions().length > 0,
+ "Hologres HASH distribution requires at least one distribution
column");
+ }
+
+ @Override
+ public Integer calculateDatetimePrecision(String typeName, int columnSize,
int scale) {
+ String upperTypeName = typeName.toUpperCase();
+ switch (upperTypeName) {
+ case "TIME":
+ case "TIMETZ":
+ case "TIMESTAMP":
+ case "TIMESTAMPTZ":
+ return Math.max(scale, 0);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Get table partitioning information from PostgreSQL system tables.
+ *
+ * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning. This
method queries
+ * pg_partitioned_table and pg_attribute system tables to determine if a
table is partitioned, and
+ * if so, returns the partition column names as a LIST transform.
+ *
+ * <p>The SQL queries follow the same approach used by Holo Client's
ConnectionUtil:
+ *
+ * <ol>
+ * <li>Query pg_partitioned_table to check if the table is partitioned and
get partition column
+ * attribute numbers (partattrs)
+ * <li>Query pg_attribute to resolve attribute numbers to column names
+ * </ol>
+ *
+ * @param connection the database connection
+ * @param databaseName the schema name
+ * @param tableName the table name
+ * @return the partition transforms, or empty if the table is not partitioned
+ * @throws SQLException if a database access error occurs
+ */
+ @Override
+ protected Transform[] getTablePartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+
+ // First, check if this is a logical partitioned table by querying table
properties.
+ // Logical partition tables in Hologres (V3.1+) have the property
+ // "is_logical_partitioned_table" set to "true", and partition columns are
stored in
+ // the "logical_partition_columns" property.
+ Transform[] logicalPartitioning = getLogicalPartitioning(connection,
tableName);
+ if (logicalPartitioning.length > 0) {
+ return logicalPartitioning;
+ }
+
+ // Fall back to physical partition table check via pg_partitioned_table
system table
+ return getPhysicalPartitioning(connection, databaseName, tableName);
+ }
+
+ /**
+ * Get logical partition information from Hologres table properties.
+ *
+ * <p>Hologres V3.1+ supports logical partition tables where the parent
table is a physical table
+ * and child partitions are logical concepts. A logical partition table is
identified by the
+ * property "is_logical_partitioned_table" = "true" in
hologres.hg_table_properties, and its
+ * partition columns are stored in the "logical_partition_columns" property.
+ *
+ * @param connection the database connection
+ * @param tableName the table name
+ * @return the partition transforms, or empty if the table is not a logical
partition table
+ * @throws SQLException if a database access error occurs
+ */
+ private Transform[] getLogicalPartitioning(Connection connection, String
tableName)
+ throws SQLException {
+ String schemaName = connection.getSchema();
+ String logicalPartitionSql =
+ "SELECT property_key, property_value "
+ + "FROM hologres.hg_table_properties "
+ + "WHERE table_namespace = ? AND table_name = ? "
+ + "AND property_key IN ('is_logical_partitioned_table',
'logical_partition_columns')";
+
+ String isLogicalPartitioned = null;
+ String logicalPartitionColumns = null;
+
+ try (PreparedStatement statement =
connection.prepareStatement(logicalPartitionSql)) {
+ statement.setString(1, schemaName);
+ statement.setString(2, tableName);
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String key = resultSet.getString("property_key");
+ String value = resultSet.getString("property_value");
+ if ("is_logical_partitioned_table".equals(key)) {
+ isLogicalPartitioned = value;
+ } else if ("logical_partition_columns".equals(key)) {
+ logicalPartitionColumns = value;
+ }
+ }
+ }
+ }
+
+ if (!"true".equalsIgnoreCase(isLogicalPartitioned)
+ || StringUtils.isEmpty(logicalPartitionColumns)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ // Parse partition column names (comma-separated, e.g., "col1" or
"col1,col2")
+ String[][] fieldNames =
+ Arrays.stream(logicalPartitionColumns.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .map(col -> new String[] {col})
+ .toArray(String[][]::new);
+
+ if (fieldNames.length == 0) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ return new Transform[] {Transforms.list(fieldNames)};
+ }
+
+ /**
+ * Get physical partition information from PostgreSQL system tables.
+ *
+ * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning for
physical partitions.
+ * This method queries pg_partitioned_table and pg_attribute system tables
to determine if a table
+ * is partitioned, and if so, returns the partition column names as a LIST
transform.
+ *
+ * @param connection the database connection
+ * @param databaseName the schema name
+ * @param tableName the table name
+ * @return the partition transforms, or empty if the table is not a physical
partition table
+ * @throws SQLException if a database access error occurs
+ */
+ private Transform[] getPhysicalPartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+
+ // Query pg_partitioned_table to get partition strategy and column
attribute numbers
+ String partitionSql =
+ "SELECT part.partstrat, part.partnatts, part.partattrs "
+ + "FROM pg_catalog.pg_class c "
+ + "JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+ + "JOIN pg_catalog.pg_partitioned_table part ON c.oid =
part.partrelid "
+ + "WHERE n.nspname = ? AND c.relname = ? "
+ + "LIMIT 1";
+
+ String partStrategy = null;
+ String partAttrs = null;
+
+ try (PreparedStatement statement =
connection.prepareStatement(partitionSql)) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ partStrategy = resultSet.getString("partstrat");
+ partAttrs = resultSet.getString("partattrs");
+ }
+ }
+ }
+
+ // Not a partitioned table
+ if (partStrategy == null || partAttrs == null) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ // Parse partition attribute numbers (e.g., "1" or "1 2")
+ String[] attrNums = partAttrs.trim().split("\\s+");
+ List<String[]> partitionColumnNames = new ArrayList<>();
+
+ // Resolve attribute numbers to column names using a single batch query
+ String placeholders = Arrays.stream(attrNums).map(a ->
"?").collect(Collectors.joining(","));
+ String attrSql =
+ "SELECT attnum, attname FROM pg_catalog.pg_attribute "
+ + "WHERE attrelid = (SELECT c.oid FROM pg_catalog.pg_class c "
+ + " JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+ + " WHERE n.nspname = ? AND c.relname = ?) "
+ + "AND attnum IN ("
+ + placeholders
+ + ") "
+ + "ORDER BY attnum";
+
+ try (PreparedStatement statement = connection.prepareStatement(attrSql)) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+ for (int i = 0; i < attrNums.length; i++) {
+ statement.setInt(3 + i, Integer.parseInt(attrNums[i]));
+ }
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ partitionColumnNames.add(new String[]
{resultSet.getString("attname")});
+ }
+ }
+ }
+
+ if (partitionColumnNames.isEmpty()) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ // Hologres only supports LIST partitioning (partstrat = 'l')
+ // Return a LIST transform with the partition column names
+ String[][] fieldNames = partitionColumnNames.toArray(new String[0][]);
+ return new Transform[] {Transforms.list(fieldNames)};
+ }
+
+ /**
+ * Get table properties from Hologres system table
hologres.hg_table_properties.
+ *
+ * <p>This method queries the Hologres system table to retrieve table
properties such as:
+ *
+ * <ul>
+ * <li>orientation: storage format (row/column/row,column)
+ * <li>clustering_key: clustering key columns
+ * <li>segment_key: event time column (segment key)
+ * <li>bitmap_columns: bitmap index columns
+ * <li>dictionary_encoding_columns: dictionary encoding columns
+ * <li>primary_key: primary key columns
+ * <li>time_to_live_in_seconds: TTL setting
+ * <li>table_group: table group name
+ * </ul>
+ *
+ * @param connection the database connection
+ * @param tableName the name of the table
+ * @return a map of table properties
+ * @throws SQLException if a database access error occurs
+ */
+ @Override
+ protected Map<String, String> getTableProperties(Connection connection,
String tableName)
+ throws SQLException {
+ Map<String, String> properties = new HashMap<>();
+ String schemaName = connection.getSchema();
+
+ // Query table properties from hologres.hg_table_properties system table
+ // The system table stores each property as a separate row with
property_key and property_value
+ String propertiesSql =
+ "SELECT property_key, property_value "
+ + "FROM hologres.hg_table_properties "
+ + "WHERE table_namespace = ? AND table_name = ?";
+
+ try (PreparedStatement statement =
connection.prepareStatement(propertiesSql)) {
+ statement.setString(1, schemaName);
+ statement.setString(2, tableName);
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String propertyKey = resultSet.getString("property_key");
+ String propertyValue = resultSet.getString("property_value");
+
+ // Only include meaningful properties that users care about
+ if (StringUtils.isNotEmpty(propertyValue) &&
isUserRelevantProperty(propertyKey)) {
+ // Convert JDBC property keys to DDL-compatible keys
+ // Hologres system table stores "binlog.level" and "binlog.ttl",
+ // but CREATE TABLE WITH clause uses "binlog_level" and
"binlog_ttl"
+ String normalizedKey = convertFromJdbcPropertyKey(propertyKey);
+ properties.put(normalizedKey, propertyValue);
+ }
+ }
+ }
+ }
+
+ LOG.debug("Loaded table properties for {}.{}: {}", schemaName, tableName,
properties);
+ return properties;
+ }
+
+ /**
+ * Convert JDBC property key to DDL-compatible property key.
+ *
+ * <p>Hologres system table {@code hologres.hg_table_properties} stores some
property keys with
+ * dots (e.g., "binlog.level", "binlog.ttl"), but the CREATE TABLE WITH
clause uses underscores
+ * (e.g., "binlog_level", "binlog_ttl"). This method converts from the JDBC
format to the DDL
+ * format so that properties can be round-tripped correctly.
+ *
+ * @param jdbcKey the property key from the JDBC query
+ * @return the DDL-compatible property key
+ */
+ private String convertFromJdbcPropertyKey(String jdbcKey) {
+ switch (jdbcKey) {
+ case "binlog.level":
+ return "binlog_level";
+ case "binlog.ttl":
+ return "binlog_ttl";
+ default:
+ return jdbcKey;
+ }
+ }
+
+ /**
+ * Check if a property key is relevant for users to see.
+ *
+ * <p>This filters out internal system properties and only returns
properties that are meaningful
+ * for users.
+ *
+ * @param propertyKey the property key to check
+ * @return true if the property is relevant for users
+ */
+ private boolean isUserRelevantProperty(String propertyKey) {
+ return USER_RELEVANT_PROPERTIES.contains(propertyKey);
+ }
}
diff --git
a/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
new file mode 100644
index 0000000000..f39cb4d61e
--- /dev/null
+++
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
@@ -0,0 +1,1084 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.gravitino.catalog.hologres.converter.HologresColumnDefaultValueConverter;
+import
org.apache.gravitino.catalog.hologres.converter.HologresExceptionConverter;
+import org.apache.gravitino.catalog.hologres.converter.HologresTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link HologresTableOperations}. */
+public class TestHologresTableOperations {
+
+ // ==================== Helper inner class for SQL generation tests
====================
+
+ private static class TestableHologresTableOperations extends
HologresTableOperations {
+ public TestableHologresTableOperations() {
+ super.exceptionMapper = new HologresExceptionConverter();
+ super.typeConverter = new HologresTypeConverter();
+ super.columnDefaultValueConverter = new
HologresColumnDefaultValueConverter();
+ }
+
+ public String createTableSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes) {
+ return generateCreateTableSql(
+ tableName, columns, comment, properties, partitioning, distribution,
indexes);
+ }
+
+ public String renameTableSql(String oldName, String newName) {
+ return generateRenameTableSql(oldName, newName);
+ }
+
+ public String dropTableSql(String tableName) {
+ return generateDropTableSql(tableName);
+ }
+
+ public String purgeTableSql(String tableName) {
+ return generatePurgeTableSql(tableName);
+ }
+
+ public String alterTableSql(String schemaName, String tableName,
TableChange... changes) {
+ return generateAlterTableSql(schemaName, tableName, changes);
+ }
+
+ public JdbcTable buildFakeTable(String tableName, JdbcColumn... columns) {
+ return JdbcTable.builder()
+ .withName(tableName)
+ .withColumns(columns)
+ .withComment("test table")
+ .withProperties(Collections.emptyMap())
+ .build();
+ }
+ }
+
+ private final TestableHologresTableOperations ops = new
TestableHologresTableOperations();
+
+ // ==================== appendPartitioningSql tests ====================
+
+ @Test
+ void testAppendPartitioningSqlPhysicalPartition() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendPartitioningSql(partitioning, false,
sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertTrue(result.contains("PARTITION BY LIST(\"ds\")"));
+ assertFalse(result.contains("LOGICAL"));
+ }
+
+ @Test
+ void testAppendPartitioningSqlLogicalPartitionSingleColumn() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendPartitioningSql(partitioning, true,
sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+ }
+
+ @Test
+ void testAppendPartitioningSqlLogicalPartitionTwoColumns() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"region"},
{"ds"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendPartitioningSql(partitioning, true,
sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"region\",
\"ds\")"));
+ }
+
+ @Test
+ void testAppendPartitioningSqlRejectsNonListTransform() {
+ Transform[] partitioning = {Transforms.identity("col1")};
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
false, sqlBuilder));
+ }
+
+ @Test
+ void testAppendPartitioningSqlPhysicalRejectsMultipleColumns() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"col1"},
{"col2"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
false, sqlBuilder));
+ }
+
+ @Test
+ void testAppendPartitioningSqlLogicalRejectsMoreThanTwoColumns() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"col1"},
{"col2"}, {"col3"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
true, sqlBuilder));
+ }
+
+ @Test
+ void testAppendPartitioningSqlRejectsEmptyPartitions() {
+ Transform[] partitioning = {Transforms.list(new String[][] {})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
false, sqlBuilder));
+ }
+
+ @Test
+ void testAppendPartitioningSqlRejectsMultipleTransforms() {
+ Transform[] partitioning = {
+ Transforms.list(new String[][] {{"col1"}}), Transforms.list(new
String[][] {{"col2"}})
+ };
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
false, sqlBuilder));
+ }
+
+ @Test
+ void testAppendPartitioningSqlRejectsNestedFieldNames() {
+ Transform[] partitioning = {Transforms.list(new String[][] {{"schema",
"col1"}})};
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendPartitioningSql(partitioning,
false, sqlBuilder));
+ }
+
+ // ==================== appendIndexesSql tests ====================
+
+ @Test
+ void testAppendIndexesSqlEmpty() {
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendIndexesSql(new Index[0], sqlBuilder);
+ assertTrue(sqlBuilder.toString().isEmpty());
+ }
+
+ @Test
+ void testAppendIndexesSqlPrimaryKey() {
+ // Hologres does not support custom constraint names, so the name is
ignored
+ Index pk = Indexes.primary("pk_test", new String[][] {{"id"}});
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertFalse(result.contains("CONSTRAINT"));
+ assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+ }
+
+ @Test
+ void testAppendIndexesSqlPrimaryKeyWithoutName() {
+ Index pk = Indexes.primary(null, new String[][] {{"id"}});
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertFalse(result.contains("CONSTRAINT"));
+ assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+ }
+
+ @Test
+ void testAppendIndexesSqlUniqueKeyThrows() {
+ // Hologres does not support UNIQUE KEY index separately
+ Index uk = Indexes.unique("uk_name", new String[][] {{"name"}});
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendIndexesSql(new Index[] {uk},
sqlBuilder));
+ }
+
+ @Test
+ void testAppendIndexesSqlCompositeKey() {
+ Index pk = Indexes.primary("pk_comp", new String[][] {{"id"}, {"ds"}});
+ StringBuilder sqlBuilder = new StringBuilder();
+ HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+ String result = sqlBuilder.toString();
+ assertTrue(result.contains("PRIMARY KEY (\"id\", \"ds\")"));
+ }
+
+ @Test
+ void testAppendIndexesSqlMultipleIndexesWithUniqueKeyThrows() {
+ // Hologres does not support UNIQUE KEY, so mixing PK and UK should throw
+ Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+ Index uk = Indexes.unique("uk_email", new String[][] {{"email"}});
+ StringBuilder sqlBuilder = new StringBuilder();
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.appendIndexesSql(new Index[] {pk, uk},
sqlBuilder));
+ }
+
+ // ==================== generateCreateTableSql tests ====================
+
+ @Test
+ void testCreateTableBasic() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("CREATE TABLE \"test_table\""));
+ assertTrue(sql.contains("\"id\" int4"));
+ assertTrue(sql.contains("NOT NULL"));
+ }
+
+ @Test
+ void testCreateTableWithComment() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ "This is a test table",
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("COMMENT ON TABLE \"test_table\" IS 'This is a
test table'"));
+ }
+
+ @Test
+ void testCreateTableWithColumnComment() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .withComment("Primary key column")
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"id\" IS
'Primary key column'"));
+ }
+
+ @Test
+ void testCreateTableWithDistribution() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ dist,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("distribution_key = 'id'"));
+ assertTrue(sql.contains("WITH ("));
+ }
+
+ @Test
+ void testCreateTableWithProperties() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("orientation", "column");
+ properties.put("time_to_live_in_seconds", "3600");
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("orientation = 'column'"));
+ assertTrue(sql.contains("time_to_live_in_seconds = '3600'"));
+ }
+
+ @Test
+ void testCreateTableDistributionKeyFilteredFromProperties() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("distribution_key", "should_be_ignored");
+ properties.put("orientation", "column");
+ Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ dist,
+ Indexes.EMPTY_INDEXES);
+ // Should use the Distribution parameter, not the property
+ assertTrue(sql.contains("distribution_key = 'id'"));
+ assertFalse(sql.contains("should_be_ignored"));
+ }
+
+ @Test
+ void testCreateTableIsLogicalPartitionedFilteredFromProperties() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("is_logical_partitioned_table", "true");
+ properties.put("orientation", "column");
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ // is_logical_partitioned_table should NOT appear in WITH clause
+ assertFalse(sql.contains("is_logical_partitioned_table"));
+ assertTrue(sql.contains("orientation = 'column'"));
+ }
+
+ @Test
+ void testCreateTablePrimaryKeyFilteredFromProperties() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("primary_key", "id");
+ properties.put("orientation", "column");
+ Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new Index[] {pk});
+ // primary_key should NOT appear in WITH clause (it is defined via PRIMARY
KEY constraint)
+ assertFalse(sql.contains("primary_key"));
+ assertTrue(sql.contains("orientation = 'column'"));
+ assertTrue(sql.contains("PRIMARY KEY"));
+ }
+
+ @Test
+ void testCreateTableWithPhysicalPartition() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("ds")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ partitioning,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("PARTITION BY LIST(\"ds\")"));
+ assertFalse(sql.contains("LOGICAL"));
+ }
+
+ @Test
+ void testCreateTableWithLogicalPartition() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("ds")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("is_logical_partitioned_table", "true");
+ Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ partitioning,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+ }
+
+ @Test
+ void testCreateTableWithLogicalPartitionTwoColumns() {
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("region")
+ .withType(Types.StringType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+ JdbcColumn.builder()
+ .withName("ds")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("is_logical_partitioned_table", "true");
+ Transform[] partitioning = {Transforms.list(new String[][] {{"region"},
{"ds"}})};
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col1, col2},
+ null,
+ properties,
+ partitioning,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"region\", \"ds\")"));
+ }
+
+ @Test
+ void testCreateTableWithPrimaryKey() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.LongType.get())
+ .withNullable(false)
+ .build();
+ Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new Index[] {pk});
+ assertTrue(sql.contains("PRIMARY KEY (\"id\")"));
+ // Hologres does not support custom constraint names, so CONSTRAINT should
not appear
+ assertFalse(sql.contains("CONSTRAINT"));
+ }
+
+ @Test
+ void testCreateTableWithAutoIncrementThrows() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .withAutoIncrement(true)
+ .build();
+ IllegalArgumentException ex =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES));
+ assertTrue(ex.getMessage().contains("auto-increment"));
+ }
+
+ @Test
+ void testCreateTableWithNullableColumn() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("name")
+ .withType(Types.StringType.get())
+ .withNullable(true)
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("NULL"));
+ assertFalse(sql.contains("NOT NULL"));
+ }
+
+ @Test
+ void testCreateTableWithDefaultValue() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("status")
+ .withType(Types.IntegerType.get())
+ .withNullable(true)
+ .withDefaultValue(Literals.integerLiteral(0))
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("DEFAULT 0"));
+ }
+
+ @Test
+ void testCreateTableMultipleColumns() {
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.LongType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+ JdbcColumn.builder()
+ .withName("name")
+ .withType(Types.StringType.get())
+ .withNullable(true)
+ .build();
+ JdbcColumn col3 =
+ JdbcColumn.builder()
+ .withName("amount")
+ .withType(Types.DecimalType.of(10, 2))
+ .withNullable(true)
+ .withDefaultValue(
+
Literals.decimalLiteral(org.apache.gravitino.rel.types.Decimal.of("0.00", 10,
2)))
+ .build();
+ String sql =
+ ops.createTableSql(
+ "orders",
+ new JdbcColumn[] {col1, col2, col3},
+ "Order table",
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("\"id\" int8"));
+ assertTrue(sql.contains("\"name\" text"));
+ assertTrue(sql.contains("\"amount\" numeric(10,2)"));
+ assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+ }
+
+ @Test
+ void testCreateTableCommentWithSingleQuotes() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("flag")
+ .withType(Types.StringType.get())
+ .withNullable(false)
+ .withComment("退货标志('R'=已退货, 'A'=未退货)")
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ "It's a test table",
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ // Single quotes in table comment should be escaped
+ assertTrue(sql.contains("IS 'It''s a test table'"));
+ // Single quotes in column comment should be escaped
+ assertTrue(sql.contains("IS '退货标志(''R''=已退货, ''A''=未退货)'"));
+ // Unescaped single quotes should NOT appear
+ assertFalse(sql.contains("IS 'It's"));
+ }
+
+ @Test
+ void testCreateTableFullFeatured() {
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("order_id")
+ .withType(Types.LongType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+ JdbcColumn.builder()
+ .withName("ds")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("is_logical_partitioned_table", "true");
+ properties.put("orientation", "column");
+ Distribution dist = Distributions.hash(0,
NamedReference.field("order_id"));
+ Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+ Index pk = Indexes.primary("pk_order", new String[][] {{"order_id"},
{"ds"}});
+ String sql =
+ ops.createTableSql(
+ "orders",
+ new JdbcColumn[] {col1, col2},
+ "Order table",
+ properties,
+ partitioning,
+ dist,
+ new Index[] {pk});
+ assertTrue(sql.contains("CREATE TABLE \"orders\""));
+ assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+ assertTrue(sql.contains("distribution_key = 'order_id'"));
+ assertTrue(sql.contains("orientation = 'column'"));
+ assertTrue(sql.contains("PRIMARY KEY (\"order_id\", \"ds\")"));
+ assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+ assertFalse(sql.contains("is_logical_partitioned_table"));
+ }
+
+ // ==================== generateRenameTableSql tests ====================
+
+ @Test
+ void testRenameTableSql() {
+ String sql = ops.renameTableSql("old_table", "new_table");
+ assertEquals("ALTER TABLE \"old_table\" RENAME TO \"new_table\"", sql);
+ }
+
+ // ==================== generateDropTableSql tests ====================
+
+ @Test
+ void testDropTableSql() {
+ String sql = ops.dropTableSql("my_table");
+ assertEquals("DROP TABLE \"my_table\"", sql);
+ }
+
+ // ==================== generatePurgeTableSql tests ====================
+
+ @Test
+ void testPurgeTableSqlThrowsException() {
+ assertThrows(UnsupportedOperationException.class, () ->
ops.purgeTableSql("my_table"));
+ }
+
+ // ==================== updateColumnAutoIncrement not supported
====================
+ // Note: Hologres does not support altering column auto-increment via ALTER
TABLE.
+ // The UpdateColumnAutoIncrement change type is rejected in
generateAlterTableSql().
+ // CREATE TABLE with auto-increment (GENERATED BY DEFAULT AS IDENTITY) is
also not supported
+ // and will throw an exception — see
testCreateTableWithAutoIncrementThrows() above.
+
+ // ==================== addIndex / deleteIndex not supported
====================
+ // Note: Hologres does not support adding or deleting indexes via ALTER
TABLE.
+ // The AddIndex and DeleteIndex change types are rejected in
generateAlterTableSql().
+ // CREATE TABLE with PRIMARY KEY and UNIQUE KEY constraints is still
supported
+ // — see testCreateTableWithPrimaryKey() above.
+
+ // ==================== getIndexFieldStr tests ====================
+
+ @Test
+ void testGetIndexFieldStrSingleColumn() {
+ String result = HologresTableOperations.getIndexFieldStr(new String[][]
{{"id"}});
+ assertEquals("\"id\"", result);
+ }
+
+ @Test
+ void testGetIndexFieldStrMultipleColumns() {
+ String result = HologresTableOperations.getIndexFieldStr(new String[][]
{{"id"}, {"name"}});
+ assertEquals("\"id\", \"name\"", result);
+ }
+
+ @Test
+ void testGetIndexFieldStrRejectsNestedColumn() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> HologresTableOperations.getIndexFieldStr(new String[][]
{{"schema", "column"}}));
+ }
+
+ // ==================== calculateDatetimePrecision tests ====================
+
+ @Test
+ void testCalculateDatetimePrecisionTimestamp() {
+ Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, 6);
+ assertEquals(6, result);
+ }
+
+ @Test
+ void testCalculateDatetimePrecisionTimestamptz() {
+ Integer result = ops.calculateDatetimePrecision("TIMESTAMPTZ", 0, 3);
+ assertEquals(3, result);
+ }
+
+ @Test
+ void testCalculateDatetimePrecisionTime() {
+ Integer result = ops.calculateDatetimePrecision("TIME", 0, 0);
+ assertEquals(0, result);
+ }
+
+ @Test
+ void testCalculateDatetimePrecisionTimetz() {
+ Integer result = ops.calculateDatetimePrecision("TIMETZ", 0, 6);
+ assertEquals(6, result);
+ }
+
+ @Test
+ void testCalculateDatetimePrecisionNonDatetime() {
+ Integer result = ops.calculateDatetimePrecision("INT4", 0, 0);
+ assertEquals(null, result);
+ }
+
+ @Test
+ void testCalculateDatetimePrecisionNegativeScaleReturnsZero() {
+ Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, -1);
+ assertEquals(0, result);
+ }
+
+ @Test
+ void testCreateTableWithUnparsedExpressionDefault() {
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("order_time")
+ .withType(Types.TimestampType.withoutTimeZone())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+ JdbcColumn.builder()
+ .withName("ds")
+ .withType(Types.TimestampType.withoutTimeZone())
+ .withNullable(false)
+ .withDefaultValue(UnparsedExpression.of("date_trunc('day'::text,
order_time)"))
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_default_expr",
+ new JdbcColumn[] {col1, col2},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ // UnparsedExpression should produce DEFAULT, not GENERATED ALWAYS AS
+ assertTrue(sql.contains("DEFAULT date_trunc('day'::text, order_time)"));
+ assertFalse(sql.contains("GENERATED ALWAYS AS"));
+ assertTrue(sql.contains("NOT NULL"));
+ }
+
+ @Test
+ void testCreateTableWithUnparsedExpressionDefaultNullable() {
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("val")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+ JdbcColumn.builder()
+ .withName("computed")
+ .withType(Types.IntegerType.get())
+ .withNullable(true)
+ .withDefaultValue(UnparsedExpression.of("val * 2"))
+ .build();
+ String sql =
+ ops.createTableSql(
+ "test_default_nullable",
+ new JdbcColumn[] {col1, col2},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ // UnparsedExpression should produce DEFAULT, not GENERATED ALWAYS AS
+ assertTrue(sql.contains("DEFAULT val * 2"));
+ assertFalse(sql.contains("GENERATED ALWAYS AS"));
+ // The nullable column should have NULL
+ assertTrue(sql.contains("NULL DEFAULT val * 2"));
+ }
+
+ // ==================== generateAlterTableSql tests ====================
+
+ @Test
+ void testAlterTableUpdateComment() {
+ // Note: updateComment needs a JdbcTable with StringIdentifier, which
requires
+ // getOrCreateTable. Since we can't mock the DB connection, we test the
SQL format
+ // via the other alter operations that don't require table loading.
+ }
+
+ @Test
+ void testAlterTableRenameColumn() {
+ String sql =
+ ops.alterTableSql(
+ "public", "test_table", TableChange.renameColumn(new String[]
{"old_col"}, "new_col"));
+ assertTrue(sql.contains("ALTER TABLE \"test_table\" RENAME COLUMN
\"old_col\" TO \"new_col\""));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnComment() {
+ String sql =
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnComment(new String[] {"col1"}, "new
comment"));
+ assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"col1\" IS 'new
comment'"));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnCommentWithSingleQuotes() {
+ String sql =
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnComment(new String[] {"col1"}, "it's a
test"));
+ assertTrue(sql.contains("IS 'it''s a test'"));
+ }
+
+ @Test
+ void testAlterTableRenameColumnRejectsNestedField() {
+ assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.renameColumn(new String[] {"schema", "col"},
"new_col")));
+ }
+
+ @Test
+ void testAlterTableSetPropertyThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> ops.alterTableSql("public", "test_table",
TableChange.setProperty("key", "value")));
+ }
+
+ @Test
+ void testAlterTableRemovePropertyThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> ops.alterTableSql("public", "test_table",
TableChange.removeProperty("key")));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnDefaultValueThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnDefaultValue(
+ new String[] {"col1"}, Literals.integerLiteral(0))));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnTypeThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnType(new String[] {"col1"},
Types.StringType.get())));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnPositionThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnPosition(
+ new String[] {"col1"},
TableChange.ColumnPosition.first())));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnNullabilityThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnNullability(new String[] {"col1"},
true)));
+ }
+
+ @Test
+ void testAlterTableAddIndexThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.addIndex(
+ Index.IndexType.PRIMARY_KEY, "pk_test", new String[][]
{{"col1"}})));
+ }
+
+ @Test
+ void testAlterTableDeleteIndexThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> ops.alterTableSql("public", "test_table",
TableChange.deleteIndex("pk_test", false)));
+ }
+
+ @Test
+ void testAlterTableUpdateColumnAutoIncrementThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.alterTableSql(
+ "public",
+ "test_table",
+ TableChange.updateColumnAutoIncrement(new String[] {"col1"},
true)));
+ }
+
+ // ==================== quoteIdentifier tests ====================
+
+ @Test
+ void testQuoteIdentifierEscapesEmbeddedDoubleQuotes() {
+ // Table name with embedded double quote should be escaped by doubling
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ String sql =
+ ops.createTableSql(
+ "table\"name",
+ new JdbcColumn[] {col},
+ null,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ assertTrue(sql.contains("CREATE TABLE \"table\"\"name\""));
+ }
+
+ // ==================== Property key/value sanitization tests
====================
+
+ @Test
+ void testCreateTablePropertyValueEscapesSingleQuotes() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("clustering_key", "col1:asc,col2's:desc");
+ String sql =
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES);
+ // Single quote in value should be escaped
+ assertTrue(sql.contains("clustering_key = 'col1:asc,col2''s:desc'"));
+ }
+
+ @Test
+ void testCreateTableInvalidPropertyKeyThrows() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("invalid-key!", "value");
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES));
+ }
+
+ @Test
+ void testCreateTableSqlInjectionPropertyKeyThrows() {
+ JdbcColumn col =
+ JdbcColumn.builder()
+ .withName("id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key'); DROP TABLE users; --", "value");
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.createTableSql(
+ "test_table",
+ new JdbcColumn[] {col},
+ null,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ Indexes.EMPTY_INDEXES));
+ }
+}