Copilot commented on code in PR #9826:
URL: https://github.com/apache/gravitino/pull/9826#discussion_r2766877960


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append("COMMENT '%s'".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");

Review Comment:
   Missing space before FIRST keyword. The line should append a space before 
FIRST to ensure proper SQL syntax: `columnDefinition.append(" FIRST");`
   ```suggestion
         columnDefinition.append(" FIRST");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append("COMMENT '%s'".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append("AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append("%s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (updateColumnPosition.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append("%s %s".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(AFTER).append(s));

Review Comment:
   Missing identifier quoting for the column name. The column name obtained 
from the reduce operation should be quoted using `quoteIdentifier()` to ensure 
proper SQL syntax. The code should be: `ifPresent(s -> 
columnDefinition.append(AFTER).append(" ").append(quoteIdentifier(s)));`
   ```suggestion
             .ifPresent(s -> columnDefinition.append(AFTER).append(" 
").append(quoteIdentifier(s)));
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -59,6 +63,381 @@
 public class TestClickHouseTableOperations extends TestClickHouse {
   private static final Type STRING = Types.StringType.get();
   private static final Type INT = Types.IntegerType.get();
+  private static final Type LONG = Types.LongType.get();
+
+  @Test
+  public void testCreateAndAlterTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = "new_table";
+    Assertions.assertDoesNotThrow(
+        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
+    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+
+    // alter table
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(STRING)
+            .withComment("new_add")
+            .withNullable(false) //
+            //            .withDefaultValue(Literals.of("hello test", STRING))
+            .build();
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.addColumn(
+            new String[] {newColumn.name()},
+            newColumn.dataType(),
+            newColumn.comment(),
+            TableChange.ColumnPosition.after("col_1"),
+            newColumn.nullable(),
+            newColumn.autoIncrement(),
+            newColumn.defaultValue())
+        //        ,
+        //        TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, "InnoDB"));
+        //    properties.put(CLICKHOUSE_ENGINE_KEY, "InnoDB"
+        );
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    List<JdbcColumn> alterColumns =
+        new ArrayList<JdbcColumn>() {
+          {
+            add(columns.get(0));
+            add(newColumn);
+            add(columns.get(1));
+            add(columns.get(2));
+            add(columns.get(3));
+          }
+        };
+    assertionsTableInfo(
+        newName, tableComment, alterColumns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // Detect unsupported properties
+    TableChange setProperty = TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, 
"ABC");
+    UnsupportedOperationException gravitinoRuntimeException =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, setProperty));
+    Assertions.assertTrue(
+        StringUtils.contains(
+            gravitinoRuntimeException.getMessage(),
+            "Alter table properties in ClickHouse is not supported"));
+
+    // delete column
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    assertionsTableInfo(
+        newName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TableChange deleteColumn = TableChange.deleteColumn(new String[] 
{newColumn.name()}, false);
+    IllegalArgumentException illegalArgumentException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, deleteColumn));
+    Assertions.assertEquals(
+        "Delete column '%s' does not exist.".formatted(newColumn.name()),
+        illegalArgumentException.getMessage());
+    Assertions.assertDoesNotThrow(
+        () ->
+            TABLE_OPERATIONS.alterTable(
+                TEST_DB_NAME.toString(),
+                newName,
+                TableChange.deleteColumn(new String[] {newColumn.name()}, 
true)));
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    Assertions.assertTrue(
+        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be dropped");
+
+    GravitinoRuntimeException exception =
+        Assertions.assertThrows(
+            GravitinoRuntimeException.class,
+            () -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName));
+    Assertions.assertTrue(StringUtils.contains(exception.getMessage(), "does 
not exist"));
+  }
+
+  @Test
+  public void testAlterTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_al_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    JdbcColumn col_1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(INT)
+            .withComment("id")
+            .withNullable(false)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    columns.add(col_1);
+    JdbcColumn col_2 =
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build();
+    columns.add(col_2);
+    JdbcColumn col_3 =
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.NULL)
+            .build();
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 'name' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    columns.add(col_3);
+    Map<String, String> properties = new HashMap<>();
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        null,
+        getSortOrders("col_2"));
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, LONG));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    // After modifying the type, some attributes of the corresponding column 
are not
+    // supported.
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(LONG)
+            .withComment(col_1.comment())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.longLiteral(0L))
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newComment = "new_comment";
+    // update table comment and column comment
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, INT),
+        TableChange.updateColumnComment(new String[] {col_2.name()}, 
newComment));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(INT)
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    col_2 =
+        JdbcColumn.builder()
+            .withName(col_2.name())
+            .withType(col_2.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(col_2.autoIncrement())
+            .withNullable(col_2.nullable())
+            .withDefaultValue(col_2.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newColName_1 = "new_col_1";
+    // rename column
+    // update table comment and column comment
+    // `new_col_1` int NOT NULL COMMENT 'id' ,
+    // `new_col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment'
+    // ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.renameColumn(new String[] {col_1.name()}, newColName_1));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(newColName_1)
+            .withType(col_1.dataType())
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(col_1.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testAlterTableUpdateColumnDefaultValue() {
+    String tableName = RandomNameUtils.genRandomName("properties_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone(0))
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("STRING")
+            .withDefaultValue(Literals.of("hello", Types.StringType.get()))
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(0).name()},
+            Literals.decimalLiteral(Decimal.of("1.23", 10, 2))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(1).name()}, Literals.longLiteral(1L)),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(2).name()},
+            
Literals.timestampLiteral(LocalDateTime.parse("2024-04-01T00:00:00"))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(3).name()}, Literals.of("world", 
Types.StringType.get())));
+
+    loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    Assertions.assertEquals(
+        Literals.decimalLiteral(Decimal.of("1.234", 10, 2)), 
loaded.columns()[0].defaultValue());

Review Comment:
   There's a mismatch in the expected decimal value. On line 422, the test 
updates the column default value to "1.23" (2 decimal places), but on line 433 
it asserts that the value should be "1.234" (3 decimal places). This appears to 
be a test bug - the expected value should match what was set: 
`Literals.decimalLiteral(Decimal.of("1.23", 10, 2))`
   ```suggestion
           Literals.decimalLiteral(Decimal.of("1.23", 10, 2)), 
loaded.columns()[0].defaultValue());
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -283,6 +292,25 @@ protected boolean getAutoIncrementInfo(ResultSet 
resultSet) throws SQLException
     return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
   }
 
+  @Override
+  public void alterTable(String databaseName, String tableName, TableChange... 
changes)
+      throws NoSuchTableException {
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;

Review Comment:
   The early return when SQL is empty (line 304) stops processing subsequent 
changes. Since the alterTable method loops through changes one at a time and 
calls generateAlterTableSql for each individual change, if one change produces 
an empty SQL (like a deleteColumn with ifExists=true for a non-existent 
column), the method returns early and skips all remaining changes. This should 
be changed to `continue` instead of `return` to process all changes.
   ```suggestion
             continue;
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));

Review Comment:
   The SQL string formatting is inconsistent. The DROP INDEX statement uses 
String.format with a formatted string inside, which is redundant. The line 
should be simplified to: `return "DROP INDEX 
%s".formatted(quoteIdentifier(deleteIndex.getName()));` Also, there's a 
trailing space that should be removed for consistency.
   ```suggestion
       return "DROP INDEX %s".formatted(quoteIdentifier(deleteIndex.getName()));
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append("COMMENT '%s'".formatted(escapedComment));

Review Comment:
   Missing space before COMMENT keyword in the SQL statement. On line 603, the 
COMMENT clause is appended without a preceding space, which could lead to SQL 
syntax errors. The line should include a space before COMMENT, like: 
`columnDefinition.append(" COMMENT '%s'".formatted(escapedComment));`
   ```suggestion
         columnDefinition.append(" COMMENT '%s'".formatted(escapedComment));
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+

Review Comment:
   The code on lines 431-433 is duplicated on lines 449-451. The 
`generateAlterTableProperties(setProperties)` method is being called and added 
to `alterSql` twice. This is redundant and should be removed. Keep only one 
occurrence - the one on lines 449-451 is better positioned after the comment 
handling.
   ```suggestion
   
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append("COMMENT '%s'".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append("AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append("%s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");

Review Comment:
   Missing space before FIRST keyword. The line should append a space before 
FIRST to ensure proper SQL syntax: `columnDefinition.append(" FIRST");`
   ```suggestion
         columnDefinition.append(" FIRST");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -327,8 +355,371 @@ protected String generatePurgeTableSql(String tableName) {
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+
+      } else if (change instanceof TableChange.SetProperty setProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(setProperty);
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // Clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+
+      } else if (change instanceof TableChange.AddColumn addColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+
+      } else if (change instanceof TableChange.RenameColumn renameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue 
updateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnType 
updateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnComment 
updateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.UpdateColumnPosition 
updateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Remove all empty SQL statements
+    List<String> nonEmptySQLs =
+        
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+    if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result =
+        "ALTER TABLE %s \n%s;"
+            .formatted(quoteIdentifier(tableName), String.join(",\n", 
nonEmptySQLs));
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  private String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    boolean indexExists =
+        Arrays.stream(lazyLoadTable.index())
+            .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+    // Index does not exist
+    if (!indexExists) {
+      // If ifExists is true, return empty string to skip the operation
+      if (deleteIndex.isIfExists()) {
+        return "";
+      } else {
+        throw new IllegalArgumentException(
+            "Index '%s' does not exist".formatted(deleteIndex.getName()));
+      }
+    }
+
+    return String.format("DROP INDEX %s 
".formatted(quoteIdentifier(deleteIndex.getName())));
+  }
+
+  private String updateColumnNullabilityDefinition(
+      TableChange.UpdateColumnNullability change, JdbcTable table) {
+    validateUpdateColumnNullable(change, table);
+
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(change.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String generateAlterTableProperties(List<TableChange.SetProperty> 
setProperties) {
+    if (CollectionUtils.isNotEmpty(setProperties)) {
+      throw new UnsupportedOperationException(
+          "Alter table properties in ClickHouse is not supported");
+    }
+
+    return "";
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, JdbcTable 
jdbcTable) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnComment.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+
+    return MODIFY_COLUMN
+        + quoteIdentifier(col)
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = addColumn.fieldName()[0];
+    StringBuilder columnDefinition = new StringBuilder();
+    //  [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after | 
FIRST]
+    if (addColumn.isNullable()) {
+      columnDefinition.append(
+          "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col), 
dataType));
+    } else {
+      columnDefinition.append("ADD COLUMN %s %s 
".formatted(quoteIdentifier(col), dataType));
+    }
+
+    if (addColumn.isAutoIncrement()) {
+      throw new UnsupportedOperationException(
+          "ClickHouse does not support adding auto increment column");
+    }
+
+    // Append default value if available
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      columnDefinition.append(
+          "DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+    }
+
+    // Append comment if available after default value
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = StringUtils.replace(addColumn.getComment(), "'", 
"''");
+      columnDefinition.append("COMMENT '%s'".formatted(escapedComment));
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append("AFTER %s 
".formatted(quoteIdentifier(afterPosition.getColumn())));
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // Do nothing, follow the default behavior of clickhouse
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+
+    return columnDefinition.toString();
+  }
+
+  private String renameColumnFieldDefinition(TableChange.RenameColumn 
renameColumn) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String oldColumnName = renameColumn.fieldName()[0];
+    String newColumnName = renameColumn.getNewName();
+
+    return "RENAME COLUMN %s TO %s"
+        .formatted(quoteIdentifier(oldColumnName), 
quoteIdentifier(newColumnName));
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append("%s %s ".formatted(MODIFY_COLUMN, 
quoteIdentifier(col)));
+    appendColumnDefinition(column, columnDefinition);
+
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (updateColumnPosition.getPosition() instanceof TableChange.After 
afterPosition) {
+      columnDefinition.append("%s %s".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append(AFTER).append(s));

Review Comment:
   Missing space before the column name in the AFTER clause. The code should 
format as: `columnDefinition.append("%s %s ".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));` to ensure proper spacing in the 
generated SQL.
   ```suggestion
         columnDefinition.append(
             "%s %s ".formatted(AFTER, 
quoteIdentifier(afterPosition.getColumn())));
       } else {
         Arrays.stream(jdbcTable.columns())
             .reduce((column1, column2) -> column2)
             .map(Column::name)
             .ifPresent(s -> columnDefinition.append("%s %s ".formatted(AFTER, 
s)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to