Copilot commented on code in PR #9826:
URL: https://github.com/apache/gravitino/pull/9826#discussion_r2740501341


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -18,16 +18,95 @@
  */
 package org.apache.gravitino.catalog.clickhouse.operations;
 
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.SETTINGS_PREFIX;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseTableOperations extends JdbcTableOperations {
 
+  private static final String BACK_QUOTE = "`";
+
+  private static final String CLICKHOUSE_AUTO_INCREMENT = "AUTO_INCREMENT";
+
+  private static final String CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Clickhouse does not support nested column names.";
+
+  @Override
+  protected List<Index> getIndexes(Connection connection, String databaseName, 
String tableName) {
+    // cause clickhouse not impl getPrimaryKeys yet, ref:
+    // https://github.com/ClickHouse/clickhouse-java/issues/1625
+    String sql =
+        "SELECT NULL AS TABLE_CAT, "
+            + "system.tables.database AS TABLE_SCHEM, "
+            + "system.tables.name AS TABLE_NAME, "
+            + "trim(c.1) AS COLUMN_NAME, "
+            + "c.2 AS KEY_SEQ, "
+            + "'PRIMARY' AS PK_NAME "
+            + "FROM system.tables "
+            + "ARRAY JOIN arrayZip(splitByChar(',', primary_key), 
arrayEnumerate(splitByChar(',', primary_key))) as c "
+            + "WHERE system.tables.primary_key <> '' "
+            + "AND system.tables.database = '"
+            + databaseName
+            + "' "
+            + "AND system.tables.name = '"
+            + tableName
+            + "' "
+            + "ORDER BY COLUMN_NAME";
+
+    try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+        ResultSet resultSet = preparedStatement.executeQuery()) {
+
+      List<Index> indexes = new ArrayList<>();
+      while (resultSet.next()) {
+        String indexName = resultSet.getString("PK_NAME");
+        String columnName = resultSet.getString("COLUMN_NAME");
+        indexes.add(
+            Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][] 
{{columnName}}));
+      }
+      return indexes;

Review Comment:
   SQL injection vulnerability: The database name in the SQL query is directly 
concatenated without using a prepared statement parameter. This could allow SQL 
injection if the database name contains malicious SQL code. Use prepared 
statement parameters instead of string concatenation.
   ```suggestion
               + "AND system.tables.database = ? "
               + "AND system.tables.name = ? "
               + "ORDER BY COLUMN_NAME";
   
       try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
         preparedStatement.setString(1, databaseName);
         preparedStatement.setString(2, tableName);
         try (ResultSet resultSet = preparedStatement.executeQuery()) {
           List<Index> indexes = new ArrayList<>();
           while (resultSet.next()) {
             String indexName = resultSet.getString("PK_NAME");
             String columnName = resultSet.getString("COLUMN_NAME");
             indexes.add(
                 Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new 
String[][] {{columnName}}));
           }
           return indexes;
         }
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +117,607 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");
+    }
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT '").append(comment).append("'");
+    }
+
+    // Add setting clause if specified, clickhouse only supports predefine 
settings
+    if (MapUtils.isNotEmpty(properties)) {
+      String settings =
+          properties.entrySet().stream()
+              .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+              .map(
+                  entry ->
+                      entry.getKey().substring(SETTINGS_PREFIX.length()) + " = 
" + entry.getValue())
+              .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+      sqlBuilder.append(settings);
+    }
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
+    if (indexes == null) {
+      return;
+    }
+
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(
+                  index.name(), Indexes.DEFAULT_CLICKHOUSE_PRIMARY_KEY_NAME)) {
+            LOG.warn(
+                "Primary key name must be PRIMARY in ClickHouse, the name {} 
will be ignored.",
+                index.name());
+          }
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          throw new IllegalArgumentException(
+              "Gravitino clickHouse doesn't support index : " + index.type());
+        default:
+          throw new IllegalArgumentException(
+              "Gravitino Clickhouse doesn't support index : " + index.type());
+      }
+    }
+  }
+
+  @Override
+  protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+  }
+
+  @Override
+  public void alterTable(String databaseName, String tableName, TableChange... 
changes)
+      throws NoSuchTableException {
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement("select * from system.tables where name = 
? ")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+                  }
+                });

Review Comment:
   Anonymous inner class can be replaced with Collections.singletonMap or 
Map.of for better readability and performance. The current implementation 
creates an anonymous HashMap subclass which is less efficient and harder to 
read.
   ```suggestion
               Map<String, String> properties = new HashMap<>();
               properties.put(COMMENT, resultSet.getString(COMMENT));
               properties.put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
               return Collections.unmodifiableMap(properties);
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+  private static final Type LONG = Types.LongType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = "new_table";
+    Assertions.assertDoesNotThrow(
+        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
+    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+
+    // alter table
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(STRING)
+            .withComment("new_add")
+            .withNullable(false) //
+            //            .withDefaultValue(Literals.of("hello test", STRING))
+            .build();
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.addColumn(
+            new String[] {newColumn.name()},
+            newColumn.dataType(),
+            newColumn.comment(),
+            TableChange.ColumnPosition.after("col_1"),
+            newColumn.nullable(),
+            newColumn.autoIncrement(),
+            newColumn.defaultValue())
+        //        ,
+        //        TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, "InnoDB"));
+        //    properties.put(CLICKHOUSE_ENGINE_KEY, "InnoDB"
+        );
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    List<JdbcColumn> alterColumns =
+        new ArrayList<JdbcColumn>() {
+          {
+            add(columns.get(0));
+            add(newColumn);
+            add(columns.get(1));
+            add(columns.get(2));
+            add(columns.get(3));
+          }
+        };

Review Comment:
   Anonymous inner class usage in test: Lines 146-154 use an anonymous 
ArrayList subclass with an instance initializer block. While functional, this 
style is outdated and less readable than modern Java alternatives like 
Arrays.asList() or List.of(). Consider refactoring to use a more modern and 
readable approach.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +117,607 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");
+    }
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT '").append(comment).append("'");
+    }
+
+    // Add setting clause if specified, clickhouse only supports predefine 
settings
+    if (MapUtils.isNotEmpty(properties)) {
+      String settings =
+          properties.entrySet().stream()
+              .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+              .map(
+                  entry ->
+                      entry.getKey().substring(SETTINGS_PREFIX.length()) + " = 
" + entry.getValue())
+              .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+      sqlBuilder.append(settings);
+    }
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
+    if (indexes == null) {
+      return;
+    }
+
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(
+                  index.name(), Indexes.DEFAULT_CLICKHOUSE_PRIMARY_KEY_NAME)) {
+            LOG.warn(
+                "Primary key name must be PRIMARY in ClickHouse, the name {} 
will be ignored.",
+                index.name());
+          }
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          throw new IllegalArgumentException(
+              "Gravitino clickHouse doesn't support index : " + index.type());
+        default:
+          throw new IllegalArgumentException(
+              "Gravitino Clickhouse doesn't support index : " + index.type());
+      }
+    }
+  }
+
+  @Override
+  protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+  }
+
+  @Override
+  public void alterTable(String databaseName, String tableName, TableChange... 
changes)
+      throws NoSuchTableException {
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement("select * from system.tables where name = 
? ")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+                  }
+                });
+          }
+        }
+
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
+  }
+
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    final DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+    // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+    return metaData.getTables(catalogName, schemaName, null, new String[] 
{"TABLE"});
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generatePurgeTableSql is not implemented 
yet.");
+        "ClickHouse does not support purge table in Gravitino, please use drop 
table");
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+    for (int i = 0; i < changes.length; i++) {
+      TableChange change = changes[i];
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+      } else if (change instanceof TableChange.SetProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(((TableChange.SetProperty) change));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) change;
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) change;
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '" + newComment + "'");
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    if (CollectionUtils.isEmpty(alterSql)) {
+      return "";
+    }
+    // Return the generated SQL statement
+    String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", 
alterSql) + ";";
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    if (deleteIndex.isIfExists()) {
+      if (Arrays.stream(lazyLoadTable.index())
+          .anyMatch(index -> index.name().equals(deleteIndex.getName()))) {
+        throw new IllegalArgumentException("Index does not exist");

Review Comment:
   Incorrect exception message in validation: The exception message says "Index 
does not exist" but the condition checks if the index EXISTS (anyMatch returns 
true). This message should be "Index already exists" or the logic should be 
negated with .noneMatch().
   ```suggestion
           throw new IllegalArgumentException("Index already exists");
   ```



##########
integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+  public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+  public static final String HOST_NAME = "gravitino-ci-clickhouse";
+  public static final int CLICKHOUSE_PORT = 8123;
+  public static final String USER_NAME = "default";
+  public static final String PASSWORD = "default";
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  protected ClickHouseContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  @Override
+  protected void setupContainer() {
+    super.setupContainer();
+    withLogConsumer(new PrintingContainerLog(format("%-14s| ", 
"clickHouseContainer")));
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    Preconditions.check("clickHouse container startup failed!", 
checkContainerStatus(5));
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    for (int attempt = 1; attempt <= retryLimit; attempt++) {
+      try (Connection connection =
+              DriverManager.getConnection(getJdbcUrl(), USER_NAME, 
getPassword());
+          Statement statement = connection.createStatement()) {
+        // Simple health check query; ClickHouse should respond if it is ready.
+        statement.execute("SELECT 1");
+        LOG.info("clickHouse container is healthy after {} attempt(s)", 
attempt);
+        return true;
+      } catch (SQLException e) {
+        LOG.warn(
+            "Failed to connect to clickHouse container on attempt {}/{}: {}",
+            attempt,
+            retryLimit,
+            e.getMessage(),
+            e);
+        if (attempt < retryLimit) {
+          try {
+            Thread.sleep(1000L);
+          } catch (InterruptedException interruptedException) {
+            Thread.currentThread().interrupt();
+            LOG.error(
+                "Interrupted while waiting to retry clickHouse container 
health check",
+                interruptedException);
+            break;
+          }
+        }
+      }
+    }
+    LOG.error("clickHouse container failed health checks after {} attempt(s)", 
retryLimit);
+    return false;
+  }
+
+  public void createDatabase(TestDatabaseName testDatabaseName) {
+    String clickHouseJdbcUrl =
+        StringUtils.substring(
+            getJdbcUrl(testDatabaseName), 0, 
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+    // Use the default username and password to connect and create the 
database;
+    // any non-default password must be configured via Gravitino catalog 
properties.
+    try (Connection connection =
+            DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME, 
getPassword());
+        Statement statement = connection.createStatement()) {
+
+      String query = String.format("CREATE DATABASE IF NOT EXISTS %s;", 
testDatabaseName);
+      // FIXME: String, which is used in SQL, can be unsafe

Review Comment:
   Hardcoded FIXME comment indicates known security issue: Line 120 contains a 
FIXME comment acknowledging that the String used in SQL can be unsafe. This SQL 
injection vulnerability should be addressed before merging by using prepared 
statements or proper escaping rather than leaving it as a known issue.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseCatalogCapability.java:
##########
@@ -1,26 +1,51 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.gravitino.catalog.clickhouse;
 
-import org.apache.gravitino.catalog.jdbc.JdbcCatalogCapability;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
+
+public class ClickHouseCatalogCapability implements Capability {
+  /**
+   * Regular expression explanation: ^[\w\p{L}-$/=]{1,64}$
+   *
+   * <p>^ - Start of the string
+   *
+   * <p>[\w\p{L}-$/=]{1,64} - Consist of 1 to 64 characters of letters (both 
cases), digits,
+   * underscores, any kind of letter from any language, hyphens, dollar signs, 
slashes or equal
+   * signs
+   *
+   * <p>\w - matches [a-zA-Z0-9_]
+   *
+   * <p>\p{L} - matches any kind of letter from any language
+   *
+   * <p>$ - End of the string
+   */
+  public static final String CLICKHOUSE_NAME_PATTERN = 
"^[\\w\\p{L}-$/=]{1,64}$";
 
-public class ClickHouseCatalogCapability extends JdbcCatalogCapability {
-  // No additional capabilities for ClickHouse at this time
+  @Override
+  public CapabilityResult specificationOnName(Scope scope, String name) {
+    // TODO(yuqi): Validate the name against reserved words

Review Comment:
   TODO comment without tracking issue: Line 44 has a TODO about validating 
names against reserved words. Since this is a capability specification that 
affects the public API, this validation should either be implemented or tracked 
with a proper issue reference.



##########
catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/converter/JdbcColumnDefaultValueConverter.java:
##########
@@ -34,11 +36,18 @@ public class JdbcColumnDefaultValueConverter {
   protected static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
   protected static final String NULL = "NULL";
   protected static final DateTimeFormatter DATE_TIME_FORMATTER =
-      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]");
+      new DateTimeFormatterBuilder()
+          .appendPattern("yyyy-MM-dd HH:mm:ss")
+          .appendFraction(ChronoField.NANO_OF_SECOND, 0, 6, true)
+          .toFormatter();

Review Comment:
   Inconsistent formatting in DATE_TIME_FORMATTER: The formatter pattern is 
changed from using multiple optional patterns 
[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S] to using appendFraction with parameters 
(0, 6, true). While this is functionally equivalent and potentially cleaner, 
ensure that this change maintains the same parsing/formatting behavior for all 
edge cases, particularly with varying fractional second lengths.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+  private static final Type LONG = Types.LongType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = "new_table";
+    Assertions.assertDoesNotThrow(
+        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
+    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+
+    // alter table
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(STRING)
+            .withComment("new_add")
+            .withNullable(false) //
+            //            .withDefaultValue(Literals.of("hello test", STRING))
+            .build();
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.addColumn(
+            new String[] {newColumn.name()},
+            newColumn.dataType(),
+            newColumn.comment(),
+            TableChange.ColumnPosition.after("col_1"),
+            newColumn.nullable(),
+            newColumn.autoIncrement(),
+            newColumn.defaultValue())
+        //        ,
+        //        TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, "InnoDB"));
+        //    properties.put(CLICKHOUSE_ENGINE_KEY, "InnoDB"
+        );
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    List<JdbcColumn> alterColumns =
+        new ArrayList<JdbcColumn>() {
+          {
+            add(columns.get(0));
+            add(newColumn);
+            add(columns.get(1));
+            add(columns.get(2));
+            add(columns.get(3));
+          }
+        };
+    assertionsTableInfo(
+        newName, tableComment, alterColumns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // Detect unsupported properties
+    TableChange setProperty = TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, 
"ABC");
+    UnsupportedOperationException gravitinoRuntimeException =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, setProperty));
+    Assertions.assertTrue(
+        StringUtils.contains(
+            gravitinoRuntimeException.getMessage(),
+            "alter table properties in ck is not supported"));
+
+    // delete column
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    assertionsTableInfo(
+        newName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TableChange deleteColumn = TableChange.deleteColumn(new String[] 
{newColumn.name()}, false);
+    IllegalArgumentException illegalArgumentException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, deleteColumn));
+    Assertions.assertEquals(
+        "Delete column does not exist: " + newColumn.name(), 
illegalArgumentException.getMessage());
+    Assertions.assertDoesNotThrow(
+        () ->
+            TABLE_OPERATIONS.alterTable(
+                TEST_DB_NAME.toString(),
+                newName,
+                TableChange.deleteColumn(new String[] {newColumn.name()}, 
true)));
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    Assertions.assertTrue(
+        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be dropped");
+
+    GravitinoRuntimeException exception =
+        Assertions.assertThrows(
+            GravitinoRuntimeException.class,
+            () -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName));
+    Assertions.assertTrue(StringUtils.contains(exception.getMessage(), "does 
not exist"));
+  }
+
+  @Test
+  public void testAlterTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_al_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    JdbcColumn col_1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(INT)
+            .withComment("id")
+            .withNullable(false)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    columns.add(col_1);
+    JdbcColumn col_2 =
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build();
+    columns.add(col_2);
+    JdbcColumn col_3 =
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.NULL)
+            .build();
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 'name' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    columns.add(col_3);
+    Map<String, String> properties = new HashMap<>();
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        null,
+        getSortOrders("col_2"));
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, LONG));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    // After modifying the type, some attributes of the corresponding column 
are not
+    // supported.
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(LONG)
+            .withComment(col_1.comment())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.longLiteral(0L))
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newComment = "new_comment";
+    // update table comment and column comment
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, INT),
+        TableChange.updateColumnComment(new String[] {col_2.name()}, 
newComment));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(INT)
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    col_2 =
+        JdbcColumn.builder()
+            .withName(col_2.name())
+            .withType(col_2.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(col_2.autoIncrement())
+            .withNullable(col_2.nullable())
+            .withDefaultValue(col_2.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newColName_1 = "new_col_1";
+    // rename column
+    // update table comment and column comment
+    // `new_col_1` int NOT NULL COMMENT 'id' ,
+    // `new_col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment'
+    // ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.renameColumn(new String[] {col_1.name()}, newColName_1));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(newColName_1)
+            .withType(col_1.dataType())
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(col_1.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testAlterTableUpdateColumnDefaultValue() {
+    String tableName = RandomNameUtils.genRandomName("properties_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("STRING")
+            .withDefaultValue(Literals.of("hello", Types.StringType.get()))
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(0).name()},
+            Literals.decimalLiteral(Decimal.of("1.23", 10, 2))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(1).name()}, Literals.longLiteral(1L)),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(2).name()},
+            
Literals.timestampLiteral(LocalDateTime.parse("2024-04-01T00:00:00"))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(3).name()}, Literals.of("world", 
Types.StringType.get())));
+
+    loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    Assertions.assertEquals(
+        Literals.decimalLiteral(Decimal.of("1.234", 10, 2)), 
loaded.columns()[0].defaultValue());
+    Assertions.assertEquals(Literals.longLiteral(1L), 
loaded.columns()[1].defaultValue());
+    Assertions.assertEquals(
+        Literals.timestampLiteral(LocalDateTime.parse("2024-04-01T00:00:00")),
+        loaded.columns()[2].defaultValue());
+    Assertions.assertEquals(
+        Literals.of("world", Types.StringType.get()), 
loaded.columns()[3].defaultValue());
+  }
+
+  @Test
+  public void testCreateAndLoadTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(true)
+            .withComment("date")
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_1"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.ShortType.get())
+            .withNullable(true)
+            .build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(INT).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_6")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_7")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    //    columns.add(
+    //        JdbcColumn.builder()
+    //            .withName("col_8")
+    //            .withType(Types.TimeType.get())
+    //            .withNullable(false)
+    //            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_9")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_10").withType(Types.DecimalType.of(10, 
2)).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(STRING).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_12")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_13")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    //    columns.add(
+    //        JdbcColumn.builder()
+    //            .withName("col_14")
+    //            .withType(Types.BinaryType.get())
+    //            .withNullable(false)
+    //            .build());

Review Comment:
   Commented-out code should be removed: Lines 126-127, 140-142, and 529-534, 
557-562 contain commented-out code. According to best practices, commented-out 
code should be removed as it clutters the codebase and can be retrieved from 
version control if needed.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +117,607 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");
+    }
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT '").append(comment).append("'");
+    }
+
+    // Add setting clause if specified, clickhouse only supports predefine 
settings
+    if (MapUtils.isNotEmpty(properties)) {
+      String settings =
+          properties.entrySet().stream()
+              .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+              .map(
+                  entry ->
+                      entry.getKey().substring(SETTINGS_PREFIX.length()) + " = 
" + entry.getValue())
+              .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+      sqlBuilder.append(settings);
+    }
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
+    if (indexes == null) {
+      return;
+    }
+
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(
+                  index.name(), Indexes.DEFAULT_CLICKHOUSE_PRIMARY_KEY_NAME)) {
+            LOG.warn(
+                "Primary key name must be PRIMARY in ClickHouse, the name {} 
will be ignored.",
+                index.name());
+          }
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          throw new IllegalArgumentException(
+              "Gravitino clickHouse doesn't support index : " + index.type());
+        default:
+          throw new IllegalArgumentException(
+              "Gravitino Clickhouse doesn't support index : " + index.type());
+      }
+    }
+  }
+
+  @Override
+  protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+  }
+
+  @Override
+  public void alterTable(String databaseName, String tableName, TableChange... 
changes)
+      throws NoSuchTableException {
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement("select * from system.tables where name = 
? ")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+                  }
+                });
+          }
+        }
+
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
+  }
+
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    final DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+    // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+    return metaData.getTables(catalogName, schemaName, null, new String[] 
{"TABLE"});
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generatePurgeTableSql is not implemented 
yet.");
+        "ClickHouse does not support purge table in Gravitino, please use drop 
table");
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+    for (int i = 0; i < changes.length; i++) {
+      TableChange change = changes[i];
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+      } else if (change instanceof TableChange.SetProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(((TableChange.SetProperty) change));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) change;
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) change;
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '" + newComment + "'");
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+

Review Comment:
   Duplicate code: The generateAlterTableProperties method is called twice in 
the same method, once at line 396 and again at line 414. This appears to be a 
copy-paste error and the second call should be removed.
   ```suggestion
   
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +117,607 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support

Review Comment:
   TODO comment indicates incomplete implementation: Line 187 has a TODO 
comment about adding partition by clause support. This suggests the feature is 
incomplete. Consider either implementing this before merging or creating a 
tracking issue and referencing it in the TODO.
   ```suggestion
       // Partition by clause is currently not supported and is intentionally 
omitted.
   ```



##########
integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+  public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+  public static final String HOST_NAME = "gravitino-ci-clickhouse";
+  public static final int CLICKHOUSE_PORT = 8123;
+  public static final String USER_NAME = "default";
+  public static final String PASSWORD = "default";
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  protected ClickHouseContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  @Override
+  protected void setupContainer() {
+    super.setupContainer();
+    withLogConsumer(new PrintingContainerLog(format("%-14s| ", 
"clickHouseContainer")));
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    Preconditions.check("clickHouse container startup failed!", 
checkContainerStatus(5));
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    for (int attempt = 1; attempt <= retryLimit; attempt++) {
+      try (Connection connection =
+              DriverManager.getConnection(getJdbcUrl(), USER_NAME, 
getPassword());
+          Statement statement = connection.createStatement()) {
+        // Simple health check query; ClickHouse should respond if it is ready.
+        statement.execute("SELECT 1");
+        LOG.info("clickHouse container is healthy after {} attempt(s)", 
attempt);
+        return true;
+      } catch (SQLException e) {
+        LOG.warn(
+            "Failed to connect to clickHouse container on attempt {}/{}: {}",
+            attempt,
+            retryLimit,
+            e.getMessage(),
+            e);
+        if (attempt < retryLimit) {
+          try {
+            Thread.sleep(1000L);
+          } catch (InterruptedException interruptedException) {
+            Thread.currentThread().interrupt();
+            LOG.error(
+                "Interrupted while waiting to retry clickHouse container 
health check",
+                interruptedException);
+            break;
+          }
+        }
+      }
+    }
+    LOG.error("clickHouse container failed health checks after {} attempt(s)", 
retryLimit);
+    return false;
+  }
+
+  public void createDatabase(TestDatabaseName testDatabaseName) {
+    String clickHouseJdbcUrl =
+        StringUtils.substring(
+            getJdbcUrl(testDatabaseName), 0, 
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+    // Use the default username and password to connect and create the 
database;
+    // any non-default password must be configured via Gravitino catalog 
properties.
+    try (Connection connection =
+            DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME, 
getPassword());
+        Statement statement = connection.createStatement()) {
+
+      String query = String.format("CREATE DATABASE IF NOT EXISTS %s;", 
testDatabaseName);
+      // FIXME: String, which is used in SQL, can be unsafe
+      statement.execute(query);

Review Comment:
   SQL injection vulnerability: The createDatabase method uses String.format to 
construct a SQL query with the testDatabaseName parameter directly inserted 
into the query string. This is dangerous if testDatabaseName contains SQL 
injection payloads. Consider using prepared statements or proper SQL escaping.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +117,607 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");
+    }
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT '").append(comment).append("'");
+    }
+
+    // Add setting clause if specified, clickhouse only supports predefine 
settings
+    if (MapUtils.isNotEmpty(properties)) {
+      String settings =
+          properties.entrySet().stream()
+              .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+              .map(
+                  entry ->
+                      entry.getKey().substring(SETTINGS_PREFIX.length()) + " = 
" + entry.getValue())
+              .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+      sqlBuilder.append(settings);
+    }
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  public static void appendIndexesSql(Index[] indexes, StringBuilder 
sqlBuilder) {
+    if (indexes == null) {
+      return;
+    }
+
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(
+                  index.name(), Indexes.DEFAULT_CLICKHOUSE_PRIMARY_KEY_NAME)) {
+            LOG.warn(
+                "Primary key name must be PRIMARY in ClickHouse, the name {} 
will be ignored.",
+                index.name());
+          }
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        case UNIQUE_KEY:
+          throw new IllegalArgumentException(
+              "Gravitino clickHouse doesn't support index : " + index.type());
+        default:
+          throw new IllegalArgumentException(
+              "Gravitino Clickhouse doesn't support index : " + index.type());
+      }
+    }
+  }
+
+  @Override
+  protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+  }
+
+  @Override
+  public void alterTable(String databaseName, String tableName, TableChange... 
changes)
+      throws NoSuchTableException {
+    LOG.info("Attempting to alter table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      for (TableChange change : changes) {
+        String sql = generateAlterTableSql(databaseName, tableName, change);
+        if (StringUtils.isEmpty(sql)) {
+          LOG.info("No changes to alter table {} from database {}", tableName, 
databaseName);
+          return;
+        }
+        JdbcConnectorUtils.executeUpdate(connection, sql);
+      }
+      LOG.info("Alter table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement("select * from system.tables where name = 
? ")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+                  }
+                });
+          }
+        }
+
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
+  }
+
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    final DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+    // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+    return metaData.getTables(catalogName, schemaName, null, new String[] 
{"TABLE"});
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generatePurgeTableSql is not implemented 
yet.");
+        "ClickHouse does not support purge table in Gravitino, please use drop 
table");
   }
 
   @Override
   protected String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    TableChange.UpdateComment updateComment = null;
+    List<TableChange.SetProperty> setProperties = new ArrayList<>();
+    List<String> alterSql = new ArrayList<>();
+    for (int i = 0; i < changes.length; i++) {
+      TableChange change = changes[i];
+      if (change instanceof TableChange.UpdateComment) {
+        updateComment = (TableChange.UpdateComment) change;
+      } else if (change instanceof TableChange.SetProperty) {
+        // The set attribute needs to be added at the end.
+        setProperties.add(((TableChange.SetProperty) change));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        // clickhouse does not support deleting table attributes, it can be 
replaced by Set Property
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.RenameColumn) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) change;
+        alterSql.add(
+            updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) change;
+        alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnNullabilityDefinition(
+                (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+      } else if (change instanceof TableChange.DeleteIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(
+            updateColumnAutoIncrementDefinition(
+                lazyLoadTable, (TableChange.UpdateColumnAutoIncrement) 
change));
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    // Last modified comment
+    if (null != updateComment) {
+      String newComment = updateComment.getNewComment();
+      if (null == StringIdentifier.fromComment(newComment)) {
+        // Detect and add Gravitino id.
+        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+      alterSql.add(" MODIFY COMMENT '" + newComment + "'");
+    }
+
+    if (!setProperties.isEmpty()) {
+      alterSql.add(generateAlterTableProperties(setProperties));
+    }
+
+    if (CollectionUtils.isEmpty(alterSql)) {
+      return "";
+    }
+    // Return the generated SQL statement
+    String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", 
alterSql) + ";";
+    LOG.info("Generated alter table:{} sql: {}", databaseName + "." + 
tableName, result);
+    return result;
+  }
+
+  private String updateColumnAutoIncrementDefinition(
+      JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+    if (change.fieldName().length > 1) {
+      throw new UnsupportedOperationException("Nested column names are not 
supported");
+    }
+    String col = change.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(table, col);
+    if (change.isAutoIncrement()) {
+      Preconditions.checkArgument(
+          Types.allowAutoIncrement(column.dataType()),
+          "Auto increment is not allowed, type: " + column.dataType());
+    }
+    JdbcColumn updateColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withDefaultValue(column.defaultValue())
+            .withNullable(column.nullable())
+            .withType(column.dataType())
+            .withComment(column.comment())
+            .withAutoIncrement(change.isAutoIncrement())
+            .build();
+    return MODIFY_COLUMN
+        + BACK_QUOTE
+        + col
+        + BACK_QUOTE
+        + appendColumnDefinition(updateColumn, new StringBuilder());
+  }
+
+  @VisibleForTesting
+  static String deleteIndexDefinition(
+      JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+    if (deleteIndex.isIfExists()) {
+      if (Arrays.stream(lazyLoadTable.index())
+          .anyMatch(index -> index.name().equals(deleteIndex.getName()))) {
+        throw new IllegalArgumentException("Index does not exist");
+      }

Review Comment:
   Logic error in deleteIndexDefinition: The condition is inverted. When 
isIfExists() is true, the code should skip the check and NOT throw an 
exception. Currently, it throws an exception when the index does not exist AND 
isIfExists() is true, which is backwards. The condition should be: if 
(!deleteIndex.isIfExists() && 
!Arrays.stream(lazyLoadTable.index()).anyMatch(...))
   ```suggestion
       if (!deleteIndex.isIfExists()
           && Arrays.stream(lazyLoadTable.index())
               .noneMatch(index -> index.name().equals(deleteIndex.getName()))) 
{
         throw new IllegalArgumentException("Index does not exist");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -18,19 +18,106 @@
  */
 package org.apache.gravitino.catalog.clickhouse.operations;
 
-import com.google.common.collect.Sets;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConfig.DEFAULT_CK_ON_CLUSTER;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.catalog.clickhouse.ClickHouseConfig;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
 
 public class ClickHouseDatabaseOperations extends JdbcDatabaseOperations {
 
+  // TODO: handle ClickHouse cluster properly when creating/dropping 
databases/tables
+  //  use https://github.com/apache/gravitino/issues/9820 to track it.
+  @SuppressWarnings("unused")
+  private boolean onCluster = false;
+
+  @SuppressWarnings("unused")
+  private String clusterName = null;

Review Comment:
   Unused or misused fields: The onCluster and clusterName fields are marked 
with @SuppressWarnings("unused") and have a TODO comment indicating they're not 
yet properly implemented. These fields are initialized but never used, 
suggesting incomplete feature implementation. Consider either implementing the 
cluster functionality or removing these fields until the feature is ready.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+  private static final Type LONG = Types.LongType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = "new_table";
+    Assertions.assertDoesNotThrow(
+        () -> TABLE_OPERATIONS.rename(TEST_DB_NAME.toString(), tableName, 
newName));
+    Assertions.assertDoesNotThrow(() -> 
TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName));
+
+    // alter table
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(STRING)
+            .withComment("new_add")
+            .withNullable(false) //
+            //            .withDefaultValue(Literals.of("hello test", STRING))
+            .build();
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.addColumn(
+            new String[] {newColumn.name()},
+            newColumn.dataType(),
+            newColumn.comment(),
+            TableChange.ColumnPosition.after("col_1"),
+            newColumn.nullable(),
+            newColumn.autoIncrement(),
+            newColumn.defaultValue())
+        //        ,
+        //        TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, "InnoDB"));
+        //    properties.put(CLICKHOUSE_ENGINE_KEY, "InnoDB"
+        );
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    List<JdbcColumn> alterColumns =
+        new ArrayList<JdbcColumn>() {
+          {
+            add(columns.get(0));
+            add(newColumn);
+            add(columns.get(1));
+            add(columns.get(2));
+            add(columns.get(3));
+          }
+        };
+    assertionsTableInfo(
+        newName, tableComment, alterColumns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // Detect unsupported properties
+    TableChange setProperty = TableChange.setProperty(CLICKHOUSE_ENGINE_KEY, 
"ABC");
+    UnsupportedOperationException gravitinoRuntimeException =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, setProperty));
+    Assertions.assertTrue(
+        StringUtils.contains(
+            gravitinoRuntimeException.getMessage(),
+            "alter table properties in ck is not supported"));
+
+    // delete column
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), newName);
+    assertionsTableInfo(
+        newName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TableChange deleteColumn = TableChange.deleteColumn(new String[] 
{newColumn.name()}, false);
+    IllegalArgumentException illegalArgumentException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () -> TABLE_OPERATIONS.alterTable(TEST_DB_NAME.toString(), 
newName, deleteColumn));
+    Assertions.assertEquals(
+        "Delete column does not exist: " + newColumn.name(), 
illegalArgumentException.getMessage());
+    Assertions.assertDoesNotThrow(
+        () ->
+            TABLE_OPERATIONS.alterTable(
+                TEST_DB_NAME.toString(),
+                newName,
+                TableChange.deleteColumn(new String[] {newColumn.name()}, 
true)));
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        newName,
+        TableChange.deleteColumn(new String[] {newColumn.name()}, true));
+    Assertions.assertTrue(
+        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be dropped");
+
+    GravitinoRuntimeException exception =
+        Assertions.assertThrows(
+            GravitinoRuntimeException.class,
+            () -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName));
+    Assertions.assertTrue(StringUtils.contains(exception.getMessage(), "does 
not exist"));
+  }
+
+  @Test
+  public void testAlterTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_al_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    JdbcColumn col_1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(INT)
+            .withComment("id")
+            .withNullable(false)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    columns.add(col_1);
+    JdbcColumn col_2 =
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build();
+    columns.add(col_2);
+    JdbcColumn col_3 =
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(STRING)
+            .withComment("name")
+            .withDefaultValue(Literals.NULL)
+            .build();
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 'name' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    columns.add(col_3);
+    Map<String, String> properties = new HashMap<>();
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        null,
+        getSortOrders("col_2"));
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, LONG));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    // After modifying the type, some attributes of the corresponding column 
are not
+    // supported.
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(LONG)
+            .withComment(col_1.comment())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.longLiteral(0L))
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newComment = "new_comment";
+    // update table comment and column comment
+    // `col_1` int NOT NULL COMMENT 'id' ,
+    // `col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment' ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnType(new String[] {col_1.name()}, INT),
+        TableChange.updateColumnComment(new String[] {col_2.name()}, 
newComment));
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(col_1.name())
+            .withType(INT)
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    col_2 =
+        JdbcColumn.builder()
+            .withName(col_2.name())
+            .withType(col_2.dataType())
+            .withComment(newComment)
+            .withAutoIncrement(col_2.autoIncrement())
+            .withNullable(col_2.nullable())
+            .withDefaultValue(col_2.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+
+    String newColName_1 = "new_col_1";
+    // rename column
+    // update table comment and column comment
+    // `new_col_1` int NOT NULL COMMENT 'id' ,
+    // `new_col_2` STRING(255) NOT NULL DEFAULT 'hello world' COMMENT 
'new_comment'
+    // ,
+    // `col_3` STRING(255) NULL DEFAULT NULL COMMENT 'name' ,
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.renameColumn(new String[] {col_1.name()}, newColName_1));
+
+    load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+
+    columns.clear();
+    col_1 =
+        JdbcColumn.builder()
+            .withName(newColName_1)
+            .withType(col_1.dataType())
+            .withComment(col_1.comment())
+            .withAutoIncrement(col_1.autoIncrement())
+            .withNullable(col_1.nullable())
+            .withDefaultValue(col_1.defaultValue())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, null, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testAlterTableUpdateColumnDefaultValue() {
+    String tableName = RandomNameUtils.genRandomName("properties_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("STRING")
+            .withDefaultValue(Literals.of("hello", Types.StringType.get()))
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+
+    TABLE_OPERATIONS.alterTable(
+        TEST_DB_NAME.toString(),
+        tableName,
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(0).name()},
+            Literals.decimalLiteral(Decimal.of("1.23", 10, 2))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(1).name()}, Literals.longLiteral(1L)),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(2).name()},
+            
Literals.timestampLiteral(LocalDateTime.parse("2024-04-01T00:00:00"))),
+        TableChange.updateColumnDefaultValue(
+            new String[] {columns.get(3).name()}, Literals.of("world", 
Types.StringType.get())));
+
+    loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    Assertions.assertEquals(
+        Literals.decimalLiteral(Decimal.of("1.234", 10, 2)), 
loaded.columns()[0].defaultValue());

Review Comment:
   Test uses decimal value with wrong precision: Line 421 creates a decimal 
literal with value "1.234" which has 4 decimal places, but the column is 
defined with precision 10 and scale 2 (meaning only 2 decimal places). This 
will likely cause a test failure or unexpected behavior. The value should be 
"1.23" to match the defined scale of 2.
   ```suggestion
           Literals.decimalLiteral(Decimal.of("1.23", 10, 2)), 
loaded.columns()[0].defaultValue());
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java:
##########
@@ -1,25 +1,164 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
-  // Implement ClickHouse specific default value conversions if needed
+
+  protected static final String NOW = "now";
+  Expression DEFAULT_VALUE_OF_NOW = FunctionExpression.of("now");
+
+  public String fromGravitino(Expression defaultValue) {
+    if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
+      return null;
+    }
+
+    if (defaultValue instanceof FunctionExpression) {
+      FunctionExpression functionExpression = (FunctionExpression) 
defaultValue;
+      return String.format("(%s)", functionExpression);
+    }
+
+    if (defaultValue instanceof Literal) {
+      Literal<?> literal = (Literal<?>) defaultValue;
+      Type type = literal.dataType();
+      if (defaultValue.equals(Literals.NULL)) {
+        return NULL;
+      } else if (type instanceof Type.NumericType) {
+        return literal.value().toString();
+      } else {
+        Object value = literal.value();
+        if (value instanceof LocalDateTime) {
+          value = ((LocalDateTime) value).format(DATE_TIME_FORMATTER);
+        }
+        return String.format("'%s'", value);
+      }
+    }
+
+    throw new IllegalArgumentException("Not a supported column default value: 
" + defaultValue);
+  }
+
+  @Override
+  public Expression toGravitino(
+      JdbcTypeConverter.JdbcTypeBean type,
+      String columnDefaultValue,
+      boolean isExpression,
+      boolean nullable) {
+    if (columnDefaultValue == null || columnDefaultValue.isEmpty()) {
+      return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+    }
+
+    String reallyType = type.getTypeName();
+    if (reallyType.startsWith("Nullable(")) {
+      reallyType = type.getTypeName().substring(9, type.getTypeName().length() 
- 1);
+    }
+
+    if (reallyType.startsWith("Decimal(")) {
+      reallyType = "Decimal";
+    }
+
+    if (reallyType.startsWith("FixedString(")) {
+      reallyType = "FixedString";
+    }

Review Comment:
   Complex logic with potential bugs in toGravitino conversion: The toGravitino 
method has complex string manipulation logic for handling Nullable and 
parameterized types (lines 76-92). This parsing logic could be fragile if the 
type string doesn't match expected patterns. Consider adding validation or 
using a more robust parsing approach to handle edge cases.



##########
catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java:
##########
@@ -74,6 +75,32 @@ void create(
       Index[] indexes)
       throws TableAlreadyExistsException;
 
+  /**
+   * Create a table with sort orders.
+   *
+   * @param databaseName database name of the table
+   * @param tableName name of the table
+   * @param columns columns of the table
+   * @param comment comment of the table
+   * @param properties properties of the table
+   * @param partitioning partitioning of the table
+   * @param distribution distribution information of the table
+   * @param indexes indexes of the table
+   * @param sortOrders sort orders of the table
+   * @throws TableAlreadyExistsException if the table already exists
+   */
+  void create(
+      String databaseName,
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders)
+      throws TableAlreadyExistsException;

Review Comment:
   Missing JavaDoc for public API: The create method with sortOrders parameter 
is a new public API method but lacks comprehensive JavaDoc documentation. While 
there is a JavaDoc comment, it doesn't explain what sortOrders parameter does 
or how it should be used.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java:
##########
@@ -1,25 +1,164 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
-  // Implement ClickHouse specific default value conversions if needed
+
+  protected static final String NOW = "now";
+  Expression DEFAULT_VALUE_OF_NOW = FunctionExpression.of("now");
+
+  public String fromGravitino(Expression defaultValue) {
+    if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
+      return null;
+    }
+
+    if (defaultValue instanceof FunctionExpression) {
+      FunctionExpression functionExpression = (FunctionExpression) 
defaultValue;
+      return String.format("(%s)", functionExpression);
+    }
+
+    if (defaultValue instanceof Literal) {
+      Literal<?> literal = (Literal<?>) defaultValue;
+      Type type = literal.dataType();
+      if (defaultValue.equals(Literals.NULL)) {
+        return NULL;
+      } else if (type instanceof Type.NumericType) {
+        return literal.value().toString();
+      } else {
+        Object value = literal.value();
+        if (value instanceof LocalDateTime) {
+          value = ((LocalDateTime) value).format(DATE_TIME_FORMATTER);
+        }
+        return String.format("'%s'", value);
+      }
+    }
+
+    throw new IllegalArgumentException("Not a supported column default value: 
" + defaultValue);
+  }
+
+  @Override
+  public Expression toGravitino(
+      JdbcTypeConverter.JdbcTypeBean type,
+      String columnDefaultValue,
+      boolean isExpression,
+      boolean nullable) {
+    if (columnDefaultValue == null || columnDefaultValue.isEmpty()) {
+      return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+    }
+
+    String reallyType = type.getTypeName();
+    if (reallyType.startsWith("Nullable(")) {
+      reallyType = type.getTypeName().substring(9, type.getTypeName().length() 
- 1);
+    }
+
+    if (reallyType.startsWith("Decimal(")) {
+      reallyType = "Decimal";
+    }
+
+    if (reallyType.startsWith("FixedString(")) {
+      reallyType = "FixedString";
+    }
+
+    if (nullable) {
+      if (columnDefaultValue.equals("NULL")) {
+        return Literals.NULL;
+      }
+    }
+
+    // TODO clickhouse has bug which isExpression is false when is really 
expression
+    if (isExpression) {

Review Comment:
   TODO comment without tracking issue: Line 100 contains a TODO about 
ClickHouse having a bug where isExpression is false when it's really an 
expression. This should either be fixed, or a tracking issue should be created 
and referenced in the comment.
   ```suggestion
       // ClickHouse may incorrectly set isExpression to false even when the 
default value is an
       // expression. Treat known expression defaults as expressions in that 
case.
       if (isExpression || NOW.equals(columnDefaultValue)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to