This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new b7536aca87 [#10073] feat(catalog-jdbc-hologres): Add table operations 
and unit tests for Hologres catalog (#10068)
b7536aca87 is described below

commit b7536aca8725f0410dd58f319f6cbab342298641
Author: Ye Ding <[email protected]>
AuthorDate: Mon Mar 9 11:32:23 2026 +0800

    [#10073] feat(catalog-jdbc-hologres): Add table operations and unit tests 
for Hologres catalog (#10068)
    
    ### What changes were proposed in this pull request?
    
    Replace the stub `HologresTableOperations` with the full implementation,
    and add `TestHologresTableOperations`:
    
    **Table Operations:**
    - `generateCreateTableSql()`: Full DDL generation with column types,
    nullability, defaults, comments, primary keys, distribution keys
    (`DISTRIBUTE BY`), partition keys (`PARTITION BY`), and table properties
    (`orientation`, `table_group`, `time_to_live_in_seconds`,
    `binlog_level`, `bitmap_columns`, `dictionary_encoding_columns`,
    `clustering_key`, `segment_key`, `event_time_column`)
    - `generateAlterTableSql()`: Support for rename table,
    add/drop/rename/update-type column, update comment, set/remove
    properties
    - `generatePurgeTableSql()`: DROP TABLE implementation
    - Table loading with metadata reconstruction from JDBC metadata and
    `pg_catalog` queries
    
    **Unit Tests (835 lines):**
    - CREATE TABLE with various column types and constraints
    - ALTER TABLE for all supported operations
    - Table property handling (orientation, TTL, binlog, etc.)
    - Distribution and partition key support
    
    ### Why are the changes needed?
    
    Enables Gravitino to fully manage Hologres tables with all
    Hologres-specific features.
    
    Fix: https://github.com/apache/gravitino/issues/10073
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The Hologres catalog is not yet documented or released.
    
    ### How was this patch tested?
    
    - Unit tests: `./gradlew :catalogs-contrib:catalog-jdbc-hologres:test
    -PskipITs` — all tests pass.
---
 .../operation/HologresSchemaOperations.java        |   12 +-
 .../operation/HologresTableOperations.java         |  982 +++++++++++++++++-
 .../operation/TestHologresTableOperations.java     | 1084 ++++++++++++++++++++
 3 files changed, 2059 insertions(+), 19 deletions(-)

diff --git 
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
 
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
index 3622f53f2f..2790287392 100644
--- 
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
@@ -18,8 +18,6 @@
  */
 package org.apache.gravitino.catalog.hologres.operation;
 
-import static 
org.apache.gravitino.catalog.hologres.operation.HologresTableOperations.HOLO_QUOTE;
-
 import com.google.common.collect.ImmutableSet;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -103,13 +101,10 @@ public class HologresSchemaOperations extends 
JdbcDatabaseOperations {
           "Hologres does not support properties on schema create.");
     }
 
-    StringBuilder sqlBuilder =
-        new StringBuilder(String.format("CREATE SCHEMA %s%s%s;", HOLO_QUOTE, 
schema, HOLO_QUOTE));
+    StringBuilder sqlBuilder = new StringBuilder(String.format("CREATE SCHEMA 
\"%s\";", schema));
     if (StringUtils.isNotEmpty(comment)) {
       String escapedComment = comment.replace("'", "''");
-      sqlBuilder.append(
-          String.format(
-              "COMMENT ON SCHEMA %s%s%s IS '%s'", HOLO_QUOTE, schema, 
HOLO_QUOTE, escapedComment));
+      sqlBuilder.append(String.format("COMMENT ON SCHEMA \"%s\" IS '%s'", 
schema, escapedComment));
     }
     return sqlBuilder.toString();
   }
@@ -133,8 +128,7 @@ public class HologresSchemaOperations extends 
JdbcDatabaseOperations {
 
   @Override
   public String generateDropDatabaseSql(String schema, boolean cascade) {
-    StringBuilder sqlBuilder =
-        new StringBuilder(String.format("DROP SCHEMA %s%s%s", HOLO_QUOTE, 
schema, HOLO_QUOTE));
+    StringBuilder sqlBuilder = new StringBuilder(String.format("DROP SCHEMA 
\"%s\"", schema));
     if (cascade) {
       sqlBuilder.append(" CASCADE");
     }
diff --git 
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
index 5beb9c0178..8b244880e0 100644
--- 
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java
@@ -18,14 +18,48 @@
  */
 package org.apache.gravitino.catalog.hologres.operation;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
 
 /**
@@ -34,17 +68,119 @@ import org.apache.gravitino.rel.indexes.Index;
  * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
  * However, Hologres has specific features like table properties (orientation, 
distribution_key,
  * etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
  */
 public class HologresTableOperations extends JdbcTableOperations
     implements RequireDatabaseOperation {
 
-  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  /** Properties that are handled separately or read-only, excluded from the 
WITH clause. */
+  private static final Set<String> EXCLUDED_TABLE_PROPERTIES =
+      ImmutableSet.of("distribution_key", "is_logical_partitioned_table", 
"primary_key");
+
+  /** Properties that are meaningful for users, filtering out internal system 
properties. */
+  private static final Set<String> USER_RELEVANT_PROPERTIES =
+      ImmutableSet.of(
+          "orientation",
+          "clustering_key",
+          "segment_key",
+          "bitmap_columns",
+          "dictionary_encoding_columns",
+          "time_to_live_in_seconds",
+          "table_group",
+          "storage_format",
+          "binlog.level",
+          "binlog.ttl",
+          "is_logical_partitioned_table",
+          "partition_expiration_time",
+          "partition_keep_hot_window",
+          "partition_require_filter",
+          "partition_generate_binlog_window");
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  protected String quoteIdentifier(String identifier) {
+    return "\"" + identifier.replace("\"", "\"\"") + "\"";
+  }
+
+  @Override
+  public void initialize(
+      DataSource dataSource,
+      JdbcExceptionConverter exceptionMapper,
+      JdbcTypeConverter jdbcTypeConverter,
+      JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+      Map<String, String> conf) {
+    super.initialize(
+        dataSource, exceptionMapper, jdbcTypeConverter, 
jdbcColumnDefaultValueConverter, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(database),
+        "The `jdbc-database` configuration item is mandatory in Hologres.");
+  }
 
   @Override
   public void setDatabaseOperation(DatabaseOperation databaseOperation) {
-    // Will be implemented in a follow-up PR.
+    this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+  }
+
+  @Override
+  public List<String> listTables(String schemaName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(schemaName)) {
+      if (!schemaOperations.schemaExists(connection, schemaName)) {
+        throw new NoSuchSchemaException("No such schema: %s", schemaName);
+      }
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.debug("Finished listing tables size {} for schema name {} ", 
names.size(), schemaName);
+      return names;
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected JdbcTable.Builder getTableBuilder(
+      ResultSet tablesResult, String databaseName, String tableName) throws 
SQLException {
+    boolean found = false;
+    JdbcTable.Builder builder = null;
+    while (tablesResult.next() && !found) {
+      String tableNameInResult = tablesResult.getString("TABLE_NAME");
+      String tableSchemaInResult = tablesResult.getString("TABLE_SCHEM");
+      if (Objects.equals(tableNameInResult, tableName)
+          && Objects.equals(tableSchemaInResult, databaseName)) {
+        builder = getBasicJdbcTableInfo(tablesResult);
+        found = true;
+      }
+    }
+
+    if (!found) {
+      throw new NoSuchTableException("Table %s does not exist in %s.", 
tableName, databaseName);
+    }
+
+    return builder;
+  }
+
+  @Override
+  protected JdbcColumn.Builder getColumnBuilder(
+      ResultSet columnsResult, String databaseName, String tableName) throws 
SQLException {
+    JdbcColumn.Builder builder = null;
+    if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)
+        && Objects.equals(columnsResult.getString("TABLE_SCHEM"), 
databaseName)) {
+      builder = getBasicJdbcColumnInfo(columnsResult);
+    }
+    return builder;
   }
 
   @Override
@@ -56,15 +192,235 @@ public class HologresTableOperations extends 
JdbcTableOperations
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE TABLE %s (%s", 
quoteIdentifier(tableName), NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s", 
quoteIdentifier(column.name())));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(
+                  expression -> {
+                    Preconditions.checkArgument(
+                        expression instanceof NamedReference,
+                        "Hologres distribution expressions must be simple 
column references");
+                    String[] fieldNames = ((NamedReference) 
expression).fieldName();
+                    Preconditions.checkArgument(
+                        fieldNames != null && fieldNames.length == 1,
+                        "Hologres distribution expressions must reference a 
single column");
+                    return fieldNames[0];
+                  })
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              Preconditions.checkArgument(
+                  key.matches("[a-zA-Z_][a-zA-Z0-9_]*"),
+                  "Invalid property key: %s. Property key must be a valid 
identifier.",
+                  key);
+              withEntries.add(String.format("%s = '%s'", key, 
value.replace("'", "''")));
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> String.format("    %s", entry))
+              .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+      sqlBuilder.append(String.format("%s)", NEW_LINE));
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(
+              String.format(
+                  "COMMENT ON TABLE %s IS '%s';", quoteIdentifier(tableName), 
escapedComment));
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn -> {
+              String escapedColComment = jdbcColumn.comment().replace("'", 
"''");
+              sqlBuilder
+                  .append(NEW_LINE)
+                  .append(
+                      String.format(
+                          "COMMENT ON COLUMN %s.%s IS '%s';",
+                          quoteIdentifier(tableName),
+                          quoteIdentifier(jdbcColumn.name()),
+                          escapedColComment));
+            });
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.debug("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(String.format(",%s", NEW_LINE));
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Hologres only supports PRIMARY_KEY index, but got: " + 
index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return "\"" + colNames[0] + "\"";
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1,
+        "Hologres only supports single partition transform, but got %s",
+        partitioning.length);
+    Preconditions.checkArgument(
+        partitioning[0] instanceof Transforms.ListTransform,
+        "Hologres only supports LIST partitioning, but got %s",
+        partitioning[0].getClass().getSimpleName());
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return "\"" + colNames[0] + "\"";
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)", 
partitionColumns));
+    } else {
+      sqlBuilder.append(String.format("PARTITION BY LIST(%s)", 
partitionColumns));
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Add NULL / NOT NULL constraint
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified
+    appendDefaultValue(column, sqlBuilder);
   }
 
   @Override
-  protected String generateAlterTableSql(
-      String schemaName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "Hologres table alteration will be implemented in a follow-up PR.");
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    return String.format(
+        "%s%s RENAME TO %s",
+        ALTER_TABLE, quoteIdentifier(oldTableName), 
quoteIdentifier(newTableName));
+  }
+
+  @Override
+  protected String generateDropTableSql(String tableName) {
+    return String.format("DROP TABLE %s", quoteIdentifier(tableName));
   }
 
   @Override
@@ -72,4 +428,610 @@ public class HologresTableOperations extends 
JdbcTableOperations
     throw new UnsupportedOperationException(
         "Hologres does not support purge table in Gravitino, please use drop 
table");
   }
+
+  @Override
+  protected String generateAlterTableSql(
+      String schemaName, String tableName, TableChange... changes) {
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.add(updateCommentDefinition((TableChange.UpdateComment) 
change, lazyLoadTable));
+      } else if (change instanceof TableChange.SetProperty) {
+        throw new IllegalArgumentException("Set property is not supported 
yet");
+      } else if (change instanceof TableChange.RemoveProperty) {
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.addAll(addColumnFieldDefinition(addColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn, tableName));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column default value via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column type via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        alterSql.add(
+            updateColumnCommentFieldDefinition(
+                (TableChange.UpdateColumnComment) change, tableName));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        throw new IllegalArgumentException("Hologres does not support column 
position.");
+      } else if (change instanceof TableChange.DeleteColumn) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column nullability via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.AddIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support adding index via ALTER TABLE.");
+      } else if (change instanceof TableChange.DeleteIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support deleting index via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column auto-increment via 
ALTER TABLE.");
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Filter out empty strings and check if there are any actual changes
+    alterSql.removeIf(String::isEmpty);
+    if (alterSql.isEmpty()) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result = String.join("\n", alterSql);
+    LOG.debug("Generated alter table:{}.{} sql: {}", schemaName, tableName, 
result);
+    return result;
+  }
+
+  private String updateCommentDefinition(
+      TableChange.UpdateComment updateComment, JdbcTable jdbcTable) {
+    String newComment = updateComment.getNewComment();
+    if (null == StringIdentifier.fromComment(newComment)) {
+      // Detect and add Gravitino id.
+      if (StringUtils.isNotEmpty(jdbcTable.comment())) {
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+    }
+    return String.format(
+        "COMMENT ON TABLE %s IS '%s';",
+        quoteIdentifier(jdbcTable.name()), newComment.replace("'", "''"));
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable table) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists =
+        Arrays.stream(table.columns()).anyMatch(s -> StringUtils.equals(col, 
s.name()));
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column does not exist: " + 
col);
+      }
+    }
+    return String.format(
+        "%s%s DROP COLUMN %s;",
+        ALTER_TABLE, quoteIdentifier(table.name()), 
quoteIdentifier(deleteColumn.fieldName()[0]));
+  }
+
+  private String renameColumnFieldDefinition(
+      TableChange.RenameColumn renameColumn, String tableName) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    return String.format(
+        "%s%s RENAME COLUMN %s TO %s;",
+        ALTER_TABLE,
+        quoteIdentifier(tableName),
+        quoteIdentifier(renameColumn.fieldName()[0]),
+        quoteIdentifier(renameColumn.getNewName()));
+  }
+
+  private List<String> addColumnFieldDefinition(
+      TableChange.AddColumn addColumn, JdbcTable lazyLoadTable) {
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    // Hologres does not support setting nullable, default value, or 
auto-increment via ADD COLUMN
+    if (!addColumn.isNullable()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting NOT NULL constraint when adding a 
column via ALTER TABLE.");
+    }
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting default value when adding a 
column via ALTER TABLE.");
+    }
+    if (addColumn.isAutoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting auto-increment when adding a 
column via ALTER TABLE.");
+    }
+
+    List<String> result = new ArrayList<>();
+    String col = addColumn.fieldName()[0];
+
+    String columnDefinition =
+        String.format(
+            "%s%s ADD COLUMN %s %s",
+            ALTER_TABLE,
+            quoteIdentifier(lazyLoadTable.name()),
+            quoteIdentifier(col),
+            typeConverter.fromGravitino(addColumn.getDataType()));
+
+    // Append position if available
+    if (!(addColumn.getPosition() instanceof TableChange.Default)) {
+      throw new IllegalArgumentException("Hologres does not support column 
position in Gravitino.");
+    }
+    result.add(columnDefinition + ";");
+
+    // Append comment if available
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = addColumn.getComment().replace("'", "''");
+      result.add(
+          String.format(
+              "COMMENT ON COLUMN %s.%s IS '%s';",
+              quoteIdentifier(lazyLoadTable.name()), quoteIdentifier(col), 
escapedComment));
+    }
+    return result;
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, String tableName) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnComment.fieldName()[0];
+    return String.format(
+        "COMMENT ON COLUMN %s.%s IS '%s';",
+        quoteIdentifier(tableName), quoteIdentifier(col), 
newComment.replace("'", "''"));
+  }
+
+  @Override
+  protected ResultSet getIndexInfo(String schemaName, String tableName, 
DatabaseMetaData metaData)
+      throws SQLException {
+    return metaData.getIndexInfo(database, schemaName, tableName, false, 
false);
+  }
+
+  @Override
+  protected ResultSet getPrimaryKeys(String schemaName, String tableName, 
DatabaseMetaData metaData)
+      throws SQLException {
+    return metaData.getPrimaryKeys(database, schemaName, tableName);
+  }
+
+  @Override
+  protected Connection getConnection(String schema) throws SQLException {
+    Connection connection = dataSource.getConnection();
+    connection.setCatalog(database);
+    connection.setSchema(schema);
+    return connection;
+  }
+
+  /**
+   * Get tables from the database including regular tables and partitioned 
parent tables.
+   *
+   * <p>In Hologres (PostgreSQL-compatible):
+   *
+   * <ul>
+   *   <li>Regular tables and partition child tables have TABLE_TYPE = "TABLE"
+   *   <li>Partitioned parent tables have TABLE_TYPE = "PARTITIONED TABLE"
+   *   <li>Views have TABLE_TYPE = "VIEW" (excluded from listing)
+   *   <li>Foreign tables have TABLE_TYPE = "FOREIGN TABLE" (excluded from 
listing)
+   * </ul>
+   *
+   * <p>This method overrides the parent to include regular tables and 
partition parent tables, but
+   * excludes views and foreign tables from the table list.
+   *
+   * @param connection the database connection
+   * @return ResultSet containing table metadata
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // Include "TABLE" (regular tables and partition children),
+    // and "PARTITIONED TABLE" (partition parent tables)
+    // Exclude "VIEW" and "FOREIGN TABLE" to hide views and foreign tables 
from Gravitino
+    return metaData.getTables(
+        catalogName, schemaName, null, new String[] {"TABLE", "PARTITIONED 
TABLE"});
+  }
+
+  @Override
+  protected ResultSet getTable(Connection connection, String schema, String 
tableName)
+      throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    // Include TABLE and PARTITIONED TABLE types
+    // Exclude VIEW and FOREIGN TABLE to hide views and foreign tables from 
Gravitino
+    return metaData.getTables(
+        database, schema, tableName, new String[] {"TABLE", "PARTITIONED 
TABLE"});
+  }
+
+  @Override
+  protected ResultSet getColumns(Connection connection, String schema, String 
tableName)
+      throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    return metaData.getColumns(database, schema, tableName, null);
+  }
+
+  /**
+   * Get distribution information from Hologres system table 
hologres.hg_table_properties.
+   *
+   * <p>In Hologres, distribution_key is stored as a table property with the 
property_key
+   * "distribution_key" and property_value as comma-separated column names 
(e.g., "col1,col2").
+   *
+   * <p>This method queries the system table and returns a HASH distribution 
with the specified
+   * columns. Hologres only supports HASH distribution strategy.
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the distribution info, or {@link Distributions#NONE} if no 
distribution_key is set
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected Distribution getDistributionInfo(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+    String schemaName = connection.getSchema();
+    String distributionSql =
+        "SELECT property_value "
+            + "FROM hologres.hg_table_properties "
+            + "WHERE table_namespace = ? AND table_name = ? AND property_key = 
'distribution_key'";
+
+    try (PreparedStatement statement = 
connection.prepareStatement(distributionSql)) {
+      statement.setString(1, schemaName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        if (resultSet.next()) {
+          String distributionKey = resultSet.getString("property_value");
+          if (StringUtils.isNotEmpty(distributionKey)) {
+            NamedReference[] columns =
+                Arrays.stream(distributionKey.split(","))
+                    .map(String::trim)
+                    .filter(StringUtils::isNotEmpty)
+                    .map(NamedReference::field)
+                    .toArray(NamedReference[]::new);
+            if (columns.length > 0) {
+              return Distributions.hash(0, columns);
+            }
+          }
+        }
+      }
+    }
+    return Distributions.NONE;
+  }
+
+  /**
+   * Validate the distribution for Hologres.
+   *
+   * <p>Hologres only supports HASH distribution strategy.
+   *
+   * @param distribution the distribution to validate
+   */
+  private void validateDistribution(Distribution distribution) {
+    Preconditions.checkArgument(
+        distribution.strategy() == Strategy.HASH,
+        "Hologres only supports HASH distribution strategy, but got: %s",
+        distribution.strategy());
+    Preconditions.checkArgument(
+        distribution.expressions().length > 0,
+        "Hologres HASH distribution requires at least one distribution 
column");
+  }
+
+  @Override
+  public Integer calculateDatetimePrecision(String typeName, int columnSize, 
int scale) {
+    String upperTypeName = typeName.toUpperCase();
+    switch (upperTypeName) {
+      case "TIME":
+      case "TIMETZ":
+      case "TIMESTAMP":
+      case "TIMESTAMPTZ":
+        return Math.max(scale, 0);
+      default:
+        return null;
+    }
+  }
+
+  /**
+   * Get table partitioning information from PostgreSQL system tables.
+   *
+   * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning. This 
method queries
+   * pg_partitioned_table and pg_attribute system tables to determine if a 
table is partitioned, and
+   * if so, returns the partition column names as a LIST transform.
+   *
+   * <p>The SQL queries follow the same approach used by Holo Client's 
ConnectionUtil:
+   *
+   * <ol>
+   *   <li>Query pg_partitioned_table to check if the table is partitioned and 
get partition column
+   *       attribute numbers (partattrs)
+   *   <li>Query pg_attribute to resolve attribute numbers to column names
+   * </ol>
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not partitioned
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected Transform[] getTablePartitioning(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+
+    // First, check if this is a logical partitioned table by querying table 
properties.
+    // Logical partition tables in Hologres (V3.1+) have the property
+    // "is_logical_partitioned_table" set to "true", and partition columns are 
stored in
+    // the "logical_partition_columns" property.
+    Transform[] logicalPartitioning = getLogicalPartitioning(connection, 
tableName);
+    if (logicalPartitioning.length > 0) {
+      return logicalPartitioning;
+    }
+
+    // Fall back to physical partition table check via pg_partitioned_table 
system table
+    return getPhysicalPartitioning(connection, databaseName, tableName);
+  }
+
+  /**
+   * Get logical partition information from Hologres table properties.
+   *
+   * <p>Hologres V3.1+ supports logical partition tables where the parent 
table is a physical table
+   * and child partitions are logical concepts. A logical partition table is 
identified by the
+   * property "is_logical_partitioned_table" = "true" in 
hologres.hg_table_properties, and its
+   * partition columns are stored in the "logical_partition_columns" property.
+   *
+   * @param connection the database connection
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not a logical 
partition table
+   * @throws SQLException if a database access error occurs
+   */
+  private Transform[] getLogicalPartitioning(Connection connection, String 
tableName)
+      throws SQLException {
+    String schemaName = connection.getSchema();
+    String logicalPartitionSql =
+        "SELECT property_key, property_value "
+            + "FROM hologres.hg_table_properties "
+            + "WHERE table_namespace = ? AND table_name = ? "
+            + "AND property_key IN ('is_logical_partitioned_table', 
'logical_partition_columns')";
+
+    String isLogicalPartitioned = null;
+    String logicalPartitionColumns = null;
+
+    try (PreparedStatement statement = 
connection.prepareStatement(logicalPartitionSql)) {
+      statement.setString(1, schemaName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String key = resultSet.getString("property_key");
+          String value = resultSet.getString("property_value");
+          if ("is_logical_partitioned_table".equals(key)) {
+            isLogicalPartitioned = value;
+          } else if ("logical_partition_columns".equals(key)) {
+            logicalPartitionColumns = value;
+          }
+        }
+      }
+    }
+
+    if (!"true".equalsIgnoreCase(isLogicalPartitioned)
+        || StringUtils.isEmpty(logicalPartitionColumns)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    // Parse partition column names (comma-separated, e.g., "col1" or 
"col1,col2")
+    String[][] fieldNames =
+        Arrays.stream(logicalPartitionColumns.split(","))
+            .map(String::trim)
+            .filter(StringUtils::isNotEmpty)
+            .map(col -> new String[] {col})
+            .toArray(String[][]::new);
+
+    if (fieldNames.length == 0) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    return new Transform[] {Transforms.list(fieldNames)};
+  }
+
+  /**
+   * Get physical partition information from PostgreSQL system tables.
+   *
+   * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning for 
physical partitions.
+   * This method queries pg_partitioned_table and pg_attribute system tables 
to determine if a table
+   * is partitioned, and if so, returns the partition column names as a LIST 
transform.
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not a physical 
partition table
+   * @throws SQLException if a database access error occurs
+   */
+  private Transform[] getPhysicalPartitioning(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+
+    // Query pg_partitioned_table to get partition strategy and column 
attribute numbers
+    String partitionSql =
+        "SELECT part.partstrat, part.partnatts, part.partattrs "
+            + "FROM pg_catalog.pg_class c "
+            + "JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+            + "JOIN pg_catalog.pg_partitioned_table part ON c.oid = 
part.partrelid "
+            + "WHERE n.nspname = ? AND c.relname = ? "
+            + "LIMIT 1";
+
+    String partStrategy = null;
+    String partAttrs = null;
+
+    try (PreparedStatement statement = 
connection.prepareStatement(partitionSql)) {
+      statement.setString(1, databaseName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        if (resultSet.next()) {
+          partStrategy = resultSet.getString("partstrat");
+          partAttrs = resultSet.getString("partattrs");
+        }
+      }
+    }
+
+    // Not a partitioned table
+    if (partStrategy == null || partAttrs == null) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    // Parse partition attribute numbers (e.g., "1" or "1 2")
+    String[] attrNums = partAttrs.trim().split("\\s+");
+    List<String[]> partitionColumnNames = new ArrayList<>();
+
+    // Resolve attribute numbers to column names using a single batch query
+    String placeholders = Arrays.stream(attrNums).map(a -> 
"?").collect(Collectors.joining(","));
+    String attrSql =
+        "SELECT attnum, attname FROM pg_catalog.pg_attribute "
+            + "WHERE attrelid = (SELECT c.oid FROM pg_catalog.pg_class c "
+            + "  JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+            + "  WHERE n.nspname = ? AND c.relname = ?) "
+            + "AND attnum IN ("
+            + placeholders
+            + ") "
+            + "ORDER BY attnum";
+
+    try (PreparedStatement statement = connection.prepareStatement(attrSql)) {
+      statement.setString(1, databaseName);
+      statement.setString(2, tableName);
+      for (int i = 0; i < attrNums.length; i++) {
+        statement.setInt(3 + i, Integer.parseInt(attrNums[i]));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          partitionColumnNames.add(new String[] 
{resultSet.getString("attname")});
+        }
+      }
+    }
+
+    if (partitionColumnNames.isEmpty()) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    // Hologres only supports LIST partitioning (partstrat = 'l')
+    // Return a LIST transform with the partition column names
+    String[][] fieldNames = partitionColumnNames.toArray(new String[0][]);
+    return new Transform[] {Transforms.list(fieldNames)};
+  }
+
+  /**
+   * Get table properties from Hologres system table 
hologres.hg_table_properties.
+   *
+   * <p>This method queries the Hologres system table to retrieve table 
properties such as:
+   *
+   * <ul>
+   *   <li>orientation: storage format (row/column/row,column)
+   *   <li>clustering_key: clustering key columns
+   *   <li>segment_key: event time column (segment key)
+   *   <li>bitmap_columns: bitmap index columns
+   *   <li>dictionary_encoding_columns: dictionary encoding columns
+   *   <li>primary_key: primary key columns
+   *   <li>time_to_live_in_seconds: TTL setting
+   *   <li>table_group: table group name
+   * </ul>
+   *
+   * @param connection the database connection
+   * @param tableName the name of the table
+   * @return a map of table properties
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    Map<String, String> properties = new HashMap<>();
+    String schemaName = connection.getSchema();
+
+    // Query table properties from hologres.hg_table_properties system table
+    // The system table stores each property as a separate row with 
property_key and property_value
+    String propertiesSql =
+        "SELECT property_key, property_value "
+            + "FROM hologres.hg_table_properties "
+            + "WHERE table_namespace = ? AND table_name = ?";
+
+    try (PreparedStatement statement = 
connection.prepareStatement(propertiesSql)) {
+      statement.setString(1, schemaName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String propertyKey = resultSet.getString("property_key");
+          String propertyValue = resultSet.getString("property_value");
+
+          // Only include meaningful properties that users care about
+          if (StringUtils.isNotEmpty(propertyValue) && 
isUserRelevantProperty(propertyKey)) {
+            // Convert JDBC property keys to DDL-compatible keys
+            // Hologres system table stores "binlog.level" and "binlog.ttl",
+            // but CREATE TABLE WITH clause uses "binlog_level" and 
"binlog_ttl"
+            String normalizedKey = convertFromJdbcPropertyKey(propertyKey);
+            properties.put(normalizedKey, propertyValue);
+          }
+        }
+      }
+    }
+
+    LOG.debug("Loaded table properties for {}.{}: {}", schemaName, tableName, 
properties);
+    return properties;
+  }
+
+  /**
+   * Convert JDBC property key to DDL-compatible property key.
+   *
+   * <p>Hologres system table {@code hologres.hg_table_properties} stores some 
property keys with
+   * dots (e.g., "binlog.level", "binlog.ttl"), but the CREATE TABLE WITH 
clause uses underscores
+   * (e.g., "binlog_level", "binlog_ttl"). This method converts from the JDBC 
format to the DDL
+   * format so that properties can be round-tripped correctly.
+   *
+   * @param jdbcKey the property key from the JDBC query
+   * @return the DDL-compatible property key
+   */
+  private String convertFromJdbcPropertyKey(String jdbcKey) {
+    switch (jdbcKey) {
+      case "binlog.level":
+        return "binlog_level";
+      case "binlog.ttl":
+        return "binlog_ttl";
+      default:
+        return jdbcKey;
+    }
+  }
+
+  /**
+   * Check if a property key is relevant for users to see.
+   *
+   * <p>This filters out internal system properties and only returns 
properties that are meaningful
+   * for users.
+   *
+   * @param propertyKey the property key to check
+   * @return true if the property is relevant for users
+   */
+  private boolean isUserRelevantProperty(String propertyKey) {
+    return USER_RELEVANT_PROPERTIES.contains(propertyKey);
+  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
new file mode 100644
index 0000000000..f39cb4d61e
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java
@@ -0,0 +1,1084 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresExceptionConverter;
+import org.apache.gravitino.catalog.hologres.converter.HologresTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link HologresTableOperations}. */
+public class TestHologresTableOperations {
+
+  // ==================== Helper inner class for SQL generation tests 
====================
+
+  private static class TestableHologresTableOperations extends 
HologresTableOperations {
+    public TestableHologresTableOperations() {
+      super.exceptionMapper = new HologresExceptionConverter();
+      super.typeConverter = new HologresTypeConverter();
+      super.columnDefaultValueConverter = new 
HologresColumnDefaultValueConverter();
+    }
+
+    public String createTableSql(
+        String tableName,
+        JdbcColumn[] columns,
+        String comment,
+        Map<String, String> properties,
+        Transform[] partitioning,
+        Distribution distribution,
+        Index[] indexes) {
+      return generateCreateTableSql(
+          tableName, columns, comment, properties, partitioning, distribution, 
indexes);
+    }
+
+    public String renameTableSql(String oldName, String newName) {
+      return generateRenameTableSql(oldName, newName);
+    }
+
+    public String dropTableSql(String tableName) {
+      return generateDropTableSql(tableName);
+    }
+
+    public String purgeTableSql(String tableName) {
+      return generatePurgeTableSql(tableName);
+    }
+
+    public String alterTableSql(String schemaName, String tableName, 
TableChange... changes) {
+      return generateAlterTableSql(schemaName, tableName, changes);
+    }
+
+    public JdbcTable buildFakeTable(String tableName, JdbcColumn... columns) {
+      return JdbcTable.builder()
+          .withName(tableName)
+          .withColumns(columns)
+          .withComment("test table")
+          .withProperties(Collections.emptyMap())
+          .build();
+    }
+  }
+
+  private final TestableHologresTableOperations ops = new 
TestableHologresTableOperations();
+
+  // ==================== appendPartitioningSql tests ====================
+
+  @Test
+  void testAppendPartitioningSqlPhysicalPartition() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, false, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(result.contains("LOGICAL"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionSingleColumn() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"region\", 
\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNonListTransform() {
+    Transform[] partitioning = {Transforms.identity("col1")};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlPhysicalRejectsMultipleColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalRejectsMoreThanTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}, {"col3"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
true, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsEmptyPartitions() {
+    Transform[] partitioning = {Transforms.list(new String[][] {})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsMultipleTransforms() {
+    Transform[] partitioning = {
+      Transforms.list(new String[][] {{"col1"}}), Transforms.list(new 
String[][] {{"col2"}})
+    };
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNestedFieldNames() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"schema", 
"col1"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  // ==================== appendIndexesSql tests ====================
+
+  @Test
+  void testAppendIndexesSqlEmpty() {
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[0], sqlBuilder);
+    assertTrue(sqlBuilder.toString().isEmpty());
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKey() {
+    // Hologres does not support custom constraint names, so the name is 
ignored
+    Index pk = Indexes.primary("pk_test", new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKeyWithoutName() {
+    Index pk = Indexes.primary(null, new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY index separately
+    Index uk = Indexes.unique("uk_name", new String[][] {{"name"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {uk}, 
sqlBuilder));
+  }
+
+  @Test
+  void testAppendIndexesSqlCompositeKey() {
+    Index pk = Indexes.primary("pk_comp", new String[][] {{"id"}, {"ds"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PRIMARY KEY (\"id\", \"ds\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlMultipleIndexesWithUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY, so mixing PK and UK should throw
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    Index uk = Indexes.unique("uk_email", new String[][] {{"email"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {pk, uk}, 
sqlBuilder));
+  }
+
+  // ==================== generateCreateTableSql tests ====================
+
+  @Test
+  void testCreateTableBasic() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("CREATE TABLE \"test_table\""));
+    assertTrue(sql.contains("\"id\" int4"));
+    assertTrue(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "This is a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON TABLE \"test_table\" IS 'This is a 
test table'"));
+  }
+
+  @Test
+  void testCreateTableWithColumnComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withComment("Primary key column")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"id\" IS 
'Primary key column'"));
+  }
+
+  @Test
+  void testCreateTableWithDistribution() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertTrue(sql.contains("WITH ("));
+  }
+
+  @Test
+  void testCreateTableWithProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("orientation", "column");
+    properties.put("time_to_live_in_seconds", "3600");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("time_to_live_in_seconds = '3600'"));
+  }
+
+  @Test
+  void testCreateTableDistributionKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("distribution_key", "should_be_ignored");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    // Should use the Distribution parameter, not the property
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertFalse(sql.contains("should_be_ignored"));
+  }
+
+  @Test
+  void testCreateTableIsLogicalPartitionedFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // is_logical_partitioned_table should NOT appear in WITH clause
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+    assertTrue(sql.contains("orientation = 'column'"));
+  }
+
+  @Test
+  void testCreateTablePrimaryKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("primary_key", "id");
+    properties.put("orientation", "column");
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    // primary_key should NOT appear in WITH clause (it is defined via PRIMARY 
KEY constraint)
+    assertFalse(sql.contains("primary_key"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY"));
+  }
+
+  @Test
+  void testCreateTableWithPhysicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(sql.contains("LOGICAL"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartitionTwoColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("region")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col1, col2},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"region\", \"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithPrimaryKey() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    assertTrue(sql.contains("PRIMARY KEY (\"id\")"));
+    // Hologres does not support custom constraint names, so CONSTRAINT should 
not appear
+    assertFalse(sql.contains("CONSTRAINT"));
+  }
+
+  @Test
+  void testCreateTableWithAutoIncrementThrows() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withAutoIncrement(true)
+            .build();
+    IllegalArgumentException ex =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                ops.createTableSql(
+                    "test_table",
+                    new JdbcColumn[] {col},
+                    null,
+                    Collections.emptyMap(),
+                    Transforms.EMPTY_TRANSFORM,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES));
+    assertTrue(ex.getMessage().contains("auto-increment"));
+  }
+
+  @Test
+  void testCreateTableWithNullableColumn() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("NULL"));
+    assertFalse(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithDefaultValue() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("status")
+            .withType(Types.IntegerType.get())
+            .withNullable(true)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("DEFAULT 0"));
+  }
+
+  @Test
+  void testCreateTableMultipleColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    JdbcColumn col3 =
+        JdbcColumn.builder()
+            .withName("amount")
+            .withType(Types.DecimalType.of(10, 2))
+            .withNullable(true)
+            .withDefaultValue(
+                
Literals.decimalLiteral(org.apache.gravitino.rel.types.Decimal.of("0.00", 10, 
2)))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2, col3},
+            "Order table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("\"id\" int8"));
+    assertTrue(sql.contains("\"name\" text"));
+    assertTrue(sql.contains("\"amount\" numeric(10,2)"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+  }
+
+  @Test
+  void testCreateTableCommentWithSingleQuotes() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("flag")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("退货标志('R'=已退货, 'A'=未退货)")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "It's a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // Single quotes in table comment should be escaped
+    assertTrue(sql.contains("IS 'It''s a test table'"));
+    // Single quotes in column comment should be escaped
+    assertTrue(sql.contains("IS '退货标志(''R''=已退货, ''A''=未退货)'"));
+    // Unescaped single quotes should NOT appear
+    assertFalse(sql.contains("IS 'It's"));
+  }
+
+  @Test
+  void testCreateTableFullFeatured() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("order_id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, 
NamedReference.field("order_id"));
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    Index pk = Indexes.primary("pk_order", new String[][] {{"order_id"}, 
{"ds"}});
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2},
+            "Order table",
+            properties,
+            partitioning,
+            dist,
+            new Index[] {pk});
+    assertTrue(sql.contains("CREATE TABLE \"orders\""));
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+    assertTrue(sql.contains("distribution_key = 'order_id'"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY (\"order_id\", \"ds\")"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+  }
+
+  // ==================== generateRenameTableSql tests ====================
+
+  @Test
+  void testRenameTableSql() {
+    String sql = ops.renameTableSql("old_table", "new_table");
+    assertEquals("ALTER TABLE \"old_table\" RENAME TO \"new_table\"", sql);
+  }
+
+  // ==================== generateDropTableSql tests ====================
+
+  @Test
+  void testDropTableSql() {
+    String sql = ops.dropTableSql("my_table");
+    assertEquals("DROP TABLE \"my_table\"", sql);
+  }
+
+  // ==================== generatePurgeTableSql tests ====================
+
+  @Test
+  void testPurgeTableSqlThrowsException() {
+    assertThrows(UnsupportedOperationException.class, () -> 
ops.purgeTableSql("my_table"));
+  }
+
+  // ==================== updateColumnAutoIncrement not supported 
====================
+  // Note: Hologres does not support altering column auto-increment via ALTER 
TABLE.
+  // The UpdateColumnAutoIncrement change type is rejected in 
generateAlterTableSql().
+  // CREATE TABLE with auto-increment (GENERATED BY DEFAULT AS IDENTITY) is 
also not supported
+  // and will throw an exception — see 
testCreateTableWithAutoIncrementThrows() above.
+
+  // ==================== addIndex / deleteIndex not supported 
====================
+  // Note: Hologres does not support adding or deleting indexes via ALTER 
TABLE.
+  // The AddIndex and DeleteIndex change types are rejected in 
generateAlterTableSql().
+  // CREATE TABLE with PRIMARY KEY and UNIQUE KEY constraints is still 
supported
+  // — see testCreateTableWithPrimaryKey() above.
+
+  // ==================== getIndexFieldStr tests ====================
+
+  @Test
+  void testGetIndexFieldStrSingleColumn() {
+    String result = HologresTableOperations.getIndexFieldStr(new String[][] 
{{"id"}});
+    assertEquals("\"id\"", result);
+  }
+
+  @Test
+  void testGetIndexFieldStrMultipleColumns() {
+    String result = HologresTableOperations.getIndexFieldStr(new String[][] 
{{"id"}, {"name"}});
+    assertEquals("\"id\", \"name\"", result);
+  }
+
+  @Test
+  void testGetIndexFieldStrRejectsNestedColumn() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.getIndexFieldStr(new String[][] 
{{"schema", "column"}}));
+  }
+
+  // ==================== calculateDatetimePrecision tests ====================
+
+  @Test
+  void testCalculateDatetimePrecisionTimestamp() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, 6);
+    assertEquals(6, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTimestamptz() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMPTZ", 0, 3);
+    assertEquals(3, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTime() {
+    Integer result = ops.calculateDatetimePrecision("TIME", 0, 0);
+    assertEquals(0, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTimetz() {
+    Integer result = ops.calculateDatetimePrecision("TIMETZ", 0, 6);
+    assertEquals(6, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionNonDatetime() {
+    Integer result = ops.calculateDatetimePrecision("INT4", 0, 0);
+    assertEquals(null, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionNegativeScaleReturnsZero() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, -1);
+    assertEquals(0, result);
+  }
+
+  @Test
+  void testCreateTableWithUnparsedExpressionDefault() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("order_time")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withDefaultValue(UnparsedExpression.of("date_trunc('day'::text, 
order_time)"))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_default_expr",
+            new JdbcColumn[] {col1, col2},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // UnparsedExpression should produce DEFAULT, not GENERATED ALWAYS AS
+    assertTrue(sql.contains("DEFAULT date_trunc('day'::text, order_time)"));
+    assertFalse(sql.contains("GENERATED ALWAYS AS"));
+    assertTrue(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithUnparsedExpressionDefaultNullable() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("val")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("computed")
+            .withType(Types.IntegerType.get())
+            .withNullable(true)
+            .withDefaultValue(UnparsedExpression.of("val * 2"))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_default_nullable",
+            new JdbcColumn[] {col1, col2},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // UnparsedExpression should produce DEFAULT, not GENERATED ALWAYS AS
+    assertTrue(sql.contains("DEFAULT val * 2"));
+    assertFalse(sql.contains("GENERATED ALWAYS AS"));
+    // The nullable column should have NULL
+    assertTrue(sql.contains("NULL DEFAULT val * 2"));
+  }
+
+  // ==================== generateAlterTableSql tests ====================
+
+  @Test
+  void testAlterTableUpdateComment() {
+    // Note: updateComment needs a JdbcTable with StringIdentifier, which 
requires
+    // getOrCreateTable. Since we can't mock the DB connection, we test the 
SQL format
+    // via the other alter operations that don't require table loading.
+  }
+
+  @Test
+  void testAlterTableRenameColumn() {
+    String sql =
+        ops.alterTableSql(
+            "public", "test_table", TableChange.renameColumn(new String[] 
{"old_col"}, "new_col"));
+    assertTrue(sql.contains("ALTER TABLE \"test_table\" RENAME COLUMN 
\"old_col\" TO \"new_col\""));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnComment() {
+    String sql =
+        ops.alterTableSql(
+            "public",
+            "test_table",
+            TableChange.updateColumnComment(new String[] {"col1"}, "new 
comment"));
+    assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"col1\" IS 'new 
comment'"));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnCommentWithSingleQuotes() {
+    String sql =
+        ops.alterTableSql(
+            "public",
+            "test_table",
+            TableChange.updateColumnComment(new String[] {"col1"}, "it's a 
test"));
+    assertTrue(sql.contains("IS 'it''s a test'"));
+  }
+
+  @Test
+  void testAlterTableRenameColumnRejectsNestedField() {
+    assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.renameColumn(new String[] {"schema", "col"}, 
"new_col")));
+  }
+
+  @Test
+  void testAlterTableSetPropertyThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> ops.alterTableSql("public", "test_table", 
TableChange.setProperty("key", "value")));
+  }
+
+  @Test
+  void testAlterTableRemovePropertyThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> ops.alterTableSql("public", "test_table", 
TableChange.removeProperty("key")));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnDefaultValueThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.updateColumnDefaultValue(
+                    new String[] {"col1"}, Literals.integerLiteral(0))));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnTypeThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.updateColumnType(new String[] {"col1"}, 
Types.StringType.get())));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnPositionThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.updateColumnPosition(
+                    new String[] {"col1"}, 
TableChange.ColumnPosition.first())));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnNullabilityThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.updateColumnNullability(new String[] {"col1"}, 
true)));
+  }
+
+  @Test
+  void testAlterTableAddIndexThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.addIndex(
+                    Index.IndexType.PRIMARY_KEY, "pk_test", new String[][] 
{{"col1"}})));
+  }
+
+  @Test
+  void testAlterTableDeleteIndexThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> ops.alterTableSql("public", "test_table", 
TableChange.deleteIndex("pk_test", false)));
+  }
+
+  @Test
+  void testAlterTableUpdateColumnAutoIncrementThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.alterTableSql(
+                "public",
+                "test_table",
+                TableChange.updateColumnAutoIncrement(new String[] {"col1"}, 
true)));
+  }
+
+  // ==================== quoteIdentifier tests ====================
+
+  @Test
+  void testQuoteIdentifierEscapesEmbeddedDoubleQuotes() {
+    // Table name with embedded double quote should be escaped by doubling
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "table\"name",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("CREATE TABLE \"table\"\"name\""));
+  }
+
+  // ==================== Property key/value sanitization tests 
====================
+
+  @Test
+  void testCreateTablePropertyValueEscapesSingleQuotes() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("clustering_key", "col1:asc,col2's:desc");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // Single quote in value should be escaped
+    assertTrue(sql.contains("clustering_key = 'col1:asc,col2''s:desc'"));
+  }
+
+  @Test
+  void testCreateTableInvalidPropertyKeyThrows() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("invalid-key!", "value");
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.createTableSql(
+                "test_table",
+                new JdbcColumn[] {col},
+                null,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                Indexes.EMPTY_INDEXES));
+  }
+
+  @Test
+  void testCreateTableSqlInjectionPropertyKeyThrows() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key'); DROP TABLE users; --", "value");
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.createTableSql(
+                "test_table",
+                new JdbcColumn[] {col},
+                null,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                Indexes.EMPTY_INDEXES));
+  }
+}

Reply via email to