diqiu50 commented on code in PR #9826:
URL: https://github.com/apache/gravitino/pull/9826#discussion_r2767704960


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }

Review Comment:
   **Critical: SQL Injection Risk**
   
   The table comment is not escaped before being inserted into SQL. If 
`newComment` contains single quotes, it will break the SQL statement.
   
   **Fix required:**
   ```java
   String escapedComment = StringUtils.replace(newComment, "'", "''");
   alterSql.add(" MODIFY COMMENT '%s'".formatted(escapedComment));
   ```
   
   This follows the same pattern used in `addColumnFieldDefinition` at line 595.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property

Review Comment:
   **Inconsistent Exception Types**
   
   Mix of `IllegalArgumentException` and `UnsupportedOperationException` for 
similar "not supported" scenarios. For consistency, use 
`UnsupportedOperationException` for all operations that ClickHouse doesn't 
support.
   
   **Suggestion:**
   ```java
   throw new UnsupportedOperationException("Remove property is not supported in 
ClickHouse");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -59,6 +63,381 @@
 public class TestClickHouseTableOperations extends TestClickHouse {
   private static final Type STRING = Types.StringType.get();
   private static final Type INT = Types.IntegerType.get();
+  private static final Type LONG = Types.LongType.get();
+
+  @Test
+  public void testCreateAndAlterTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = "new_table";
+    Assertions.assertDoesNotThrow(
+        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
+    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+
+    // alter table
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(STRING)
+            .withComment("new_add")

Review Comment:
   **Remove Commented-Out Code**
   
   Please remove all commented-out code before merging. If this code is needed 
for future reference, it can be found in git history.
   
   Per project guidelines in `.github/copilot-instructions.md`: "No wildcard 
imports; close resources (try-with-resources); do not leave TODO/FIXME without 
an issue reference."



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return "DROP INDEX %s ".formatted(quoteIdentifier(deleteIndex.getName()));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append(" COMMENT '%s' ".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(" AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append(" %s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (updateColumnPosition.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(
+          " %s %s ".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(" %s %s ".formatted(AFTER, 
quoteIdentifier(s))));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists = true;
+    try {
+      getJdbcColumnFromTable(jdbcTable, col);
+    } catch (NoSuchColumnException noSuchColumnException) {
+      colExists = false;
+    }
+
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column '%s' does not 
exist.".formatted(col));
+      }
+    }
+
+    return "DROP COLUMN %s".formatted(quoteIdentifier(col));
+  }
+

Review Comment:
   **Exception-Based Control Flow Anti-Pattern**
   
   Using exceptions for control flow is generally an anti-pattern and can 
impact performance.
   
   **Suggestion:** Add a helper method to check column existence:
   ```java
   private boolean columnExists(JdbcTable table, String columnName) {
     return Arrays.stream(table.columns())
         .anyMatch(col -> col.name().equals(columnName));
   }
   ```
   
   Then use:
   ```java
   boolean colExists = columnExists(jdbcTable, col);
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return "DROP INDEX %s ".formatted(quoteIdentifier(deleteIndex.getName()));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append(" COMMENT '%s' ".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(" AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append(" %s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (updateColumnPosition.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(
+          " %s %s ".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(" %s %s ".formatted(AFTER, 
quoteIdentifier(s))));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);

Review Comment:
   **Unnecessary Logic for Default Position**
   
   This code places the column after the last column when position is 
`Default`. However, ClickHouse's default behavior already appends columns to 
the end. This extra logic is redundant and can be removed.
   
   **Suggestion:**
   ```java
   } else {
     // ClickHouse default behavior: append to end
   }
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -22,27 +22,36 @@
 import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
 import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
+import com.google.common.annotations.VisibleForTesting;

Review Comment:
   **Code Style: Run spotlessApply**
   
   Multiple new imports have been added. Please run `./gradlew spotlessApply` 
to ensure proper import ordering and formatting according to Google Java Style 
guidelines.
   
   Per `CLAUDE.md`: "**Style**: Follow rigid Google Java Style. Run `./gradlew 
spotlessApply` to format."



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return "DROP INDEX %s ".formatted(quoteIdentifier(deleteIndex.getName()));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append(" COMMENT '%s' ".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(" AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append(" %s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append(" FIRST ");
+    } else if (updateColumnPosition.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append(
+          " %s %s ".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(" %s %s ".formatted(AFTER, 
quoteIdentifier(s))));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists = true;
+    try {
+      getJdbcColumnFromTable(jdbcTable, col);
+    } catch (NoSuchColumnException noSuchColumnException) {
+      colExists = false;
+    }
+
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column '%s' does not 
exist.".formatted(col));
+      }
+    }
+
+    return "DROP COLUMN %s".formatted(quoteIdentifier(col));
+  }
+
+  private String updateColumnDefaultValueFieldDefinition(
+      TableChange.UpdateColumnDefaultValue updateColumnDefaultValue, JdbcTable 
jdbcTable) {
+    if (updateColumnDefaultValue.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnDefaultValue.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + 
quoteIdentifier(col));
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(column.dataType())
+            .withNullable(column.nullable())
+            .withComment(column.comment())
+            .withDefaultValue(updateColumnDefaultValue.getNewDefaultValue())
+            .build();
+
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
+  }
+
+  private String updateColumnTypeFieldDefinition(
+      TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) {
+    if (updateColumnType.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnType.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder(MODIFY_COLUMN + 
quoteIdentifier(col));

Review Comment:
   **Potential Data Loss: Default Value Removed**
   
   When changing column type, the default value is explicitly set to 
`DEFAULT_VALUE_NOT_SET`, which will **remove** any existing default value. This 
could cause unexpected behavior.
   
   **Options:**
   1. Preserve the existing default value: 
`.withDefaultValue(column.defaultValue())`
   2. Document this behavior clearly in the method JavaDoc
   3. Add a warning log when removing a default value
   
   Which approach aligns with ClickHouse's ALTER TABLE behavior?



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +353,368 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];

Review Comment:
   **Incomplete Validation for Auto-Increment**
   
   The validation checks if the type allows auto-increment, but ClickHouse has 
additional requirements:
   - Auto-increment columns must be part of the primary key or sorting key
   - Only one auto-increment column is allowed per table
   
   Consider adding these validations to prevent runtime errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to