Copilot commented on code in PR #9776:
URL: https://github.com/apache/gravitino/pull/9776#discussion_r2752782978


##########
integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+  public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+  public static final String HOST_NAME = "gravitino-ci-clickhouse";
+  public static final int CLICKHOUSE_PORT = 8123;
+  public static final String USER_NAME = "default";
+  public static final String PASSWORD = "default";
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  protected ClickHouseContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  @Override
+  protected void setupContainer() {
+    super.setupContainer();
+    withLogConsumer(new PrintingContainerLog(format("%-14s| ", 
"clickHouseContainer")));
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    Preconditions.check("clickHouse container startup failed!", 
checkContainerStatus(5));
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    for (int attempt = 1; attempt <= retryLimit; attempt++) {
+      try (Connection connection =
+              DriverManager.getConnection(getJdbcUrl(), USER_NAME, 
getPassword());
+          Statement statement = connection.createStatement()) {
+        // Simple health check query; ClickHouse should respond if it is ready.
+        statement.execute("SELECT 1");
+        LOG.info("clickHouse container is healthy after {} attempt(s)", 
attempt);
+        return true;
+      } catch (SQLException e) {
+        LOG.warn(
+            "Failed to connect to clickHouse container on attempt {}/{}: {}",
+            attempt,
+            retryLimit,
+            e.getMessage(),
+            e);
+        if (attempt < retryLimit) {
+          try {
+            Thread.sleep(1000L);
+          } catch (InterruptedException interruptedException) {
+            Thread.currentThread().interrupt();
+            LOG.error(
+                "Interrupted while waiting to retry clickHouse container 
health check",
+                interruptedException);
+            break;
+          }
+        }
+      }
+    }
+    LOG.error("clickHouse container failed health checks after {} attempt(s)", 
retryLimit);
+    return false;
+  }
+
+  public void createDatabase(TestDatabaseName testDatabaseName) {
+    String clickHouseJdbcUrl =
+        StringUtils.substring(
+            getJdbcUrl(testDatabaseName), 0, 
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+    // Use the default username and password to connect and create the 
database;
+    // any non-default password must be configured via Gravitino catalog 
properties.
+    try (Connection connection =
+            DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME, 
getPassword());
+        Statement statement = connection.createStatement()) {
+
+      String query = String.format("CREATE DATABASE IF NOT EXISTS %s;", 
testDatabaseName);
+      // FIXME: String, which is used in SQL, can be unsafe
+      statement.execute(query);

Review Comment:
   SQL injection vulnerability: The testDatabaseName parameter is directly 
interpolated into the SQL query without proper escaping or parameterization. 
While TestDatabaseName is an enum and may be considered controlled input in 
test scenarios, this pattern is unsafe and could lead to SQL injection if the 
enum values are ever changed to include SQL metacharacters. Use a parameterized 
query or properly escape the database name.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,13 +109,189 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");
+    }
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      sqlBuilder.append(" COMMENT '").append(comment).append("'");

Review Comment:
   SQL injection vulnerability: Comments containing single quotes are not 
escaped before being embedded in SQL. If a user provides a comment with a 
single quote (e.g., "test's table"), the generated SQL will be malformed and 
could lead to SQL injection. Use proper SQL escaping (e.g., replace single 
quotes with two single quotes) or consider using parameterized queries where 
supported.
   ```suggestion
         String escapedComment = comment.replace("'", "''");
         sqlBuilder.append(" COMMENT '").append(escapedComment).append("'");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testCreateAndLoadTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(true)
+            .withComment("date")
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_1"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.ShortType.get())
+            .withNullable(true)
+            .build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(INT).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_6")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_7")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_9")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_10").withType(Types.DecimalType.of(10, 
2)).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(STRING).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_12")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_13")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_15")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        Collections.emptyMap(),
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        load);
+  }
+
+  @Test
+  public void testCreateNotSupportTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    List<Type> notSupportType =
+        Arrays.asList(
+            Types.FixedType.of(10),
+            Types.IntervalDayType.get(),
+            Types.IntervalYearType.get(),
+            Types.ListType.of(Types.DateType.get(), true),
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+            Types.UnionType.of(Types.IntegerType.get()),
+            Types.StructType.of(
+                Types.StructType.Field.notNullField("col_1", 
Types.IntegerType.get())));
+
+    for (Type type : notSupportType) {
+      columns.clear();
+      columns.add(
+          
JdbcColumn.builder().withName("col_1").withType(type).withNullable(false).build());
+
+      JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]);
+      Map<String, String> emptyMap = Collections.emptyMap();
+      IllegalArgumentException illegalArgumentException =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> {
+                TABLE_OPERATIONS.create(
+                    TEST_DB_NAME.toString(),
+                    tableName,
+                    jdbcCols,
+                    tableComment,
+                    emptyMap,
+                    null,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES,
+                    getSortOrders("col_1"));
+              });
+      Assertions.assertTrue(
+          illegalArgumentException
+              .getMessage()
+              .contains(
+                  String.format(
+                      "Couldn't convert Gravitino type %s to ClickHouse type",
+                      type.simpleString())));
+    }
+  }
+
+  @Test
+  public void testCreateMultipleTables() {
+    String test_table_1 = "test_table_1";
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        test_table_1,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    String testDb = "test_db_2";
+
+    DATABASE_OPERATIONS.create(testDb, null, null);
+    List<String> tables = TABLE_OPERATIONS.listTables(testDb);
+    Assertions.assertFalse(tables.contains(test_table_1));
+
+    String test_table_2 = "test_table_2";
+    TABLE_OPERATIONS.create(
+        testDb,
+        test_table_2,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertFalse(tables.contains(test_table_2));
+  }
+
+  @Test
+  public void testLoadTableDefaultProperties() {
+    String test_table_1 = RandomNameUtils.genRandomName("properties_table_");

Review Comment:
   Variable naming convention violation: Java variable names should use 
camelCase, not snake_case. Rename test_table_1 to testTable1 following Java 
naming conventions.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java:
##########
@@ -1,25 +1,164 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
-  // Implement ClickHouse specific default value conversions if needed
+
+  protected static final String NOW = "now";
+  Expression DEFAULT_VALUE_OF_NOW = FunctionExpression.of("now");
+
+  public String fromGravitino(Expression defaultValue) {
+    if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
+      return null;
+    }
+
+    if (defaultValue instanceof FunctionExpression) {
+      FunctionExpression functionExpression = (FunctionExpression) 
defaultValue;
+      return String.format("(%s)", functionExpression);
+    }
+
+    if (defaultValue instanceof Literal) {
+      Literal<?> literal = (Literal<?>) defaultValue;
+      Type type = literal.dataType();
+      if (defaultValue.equals(Literals.NULL)) {
+        return NULL;
+      } else if (type instanceof Type.NumericType) {
+        return literal.value().toString();
+      } else {
+        Object value = literal.value();
+        if (value instanceof LocalDateTime) {
+          value = ((LocalDateTime) value).format(DATE_TIME_FORMATTER);
+        }
+        return String.format("'%s'", value);

Review Comment:
   SQL injection vulnerability: String values are wrapped in single quotes 
without escaping. If a string literal contains a single quote (e.g., "it's"), 
the generated SQL will be malformed and could lead to SQL injection. Use proper 
SQL escaping for string values.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,13 +109,189 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);

Review Comment:
   Mutating input parameter: The properties map is modified by calling 
properties.remove(CLICKHOUSE_ENGINE_KEY), which mutates the caller's map. This 
is a side effect that may not be expected by the caller. Consider making a 
defensive copy of the properties map before modifying it, or document this side 
effect in the method's JavaDoc.
   ```suggestion
         String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java:
##########
@@ -1,25 +1,164 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
-  // Implement ClickHouse specific default value conversions if needed
+
+  protected static final String NOW = "now";
+  Expression DEFAULT_VALUE_OF_NOW = FunctionExpression.of("now");

Review Comment:
   Missing modifiers: The field DEFAULT_VALUE_OF_NOW should be declared as 
static final since it's a constant expression that doesn't change. This follows 
Java naming conventions and coding best practices for constants.
   ```suggestion
     protected static final Expression DEFAULT_VALUE_OF_NOW = 
FunctionExpression.of(NOW);
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -53,4 +300,28 @@ protected String generateAlterTableSql(
     throw new UnsupportedOperationException(
         "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
   }
+
+  private StringBuilder appendColumnDefinition(JdbcColumn column, 
StringBuilder sqlBuilder) {
+    // Add Nullable data type
+    String dataType = typeConverter.fromGravitino(column.dataType());
+    if (column.nullable()) {
+      
sqlBuilder.append(SPACE).append("Nullable(").append(dataType).append(")").append(SPACE);
+    } else {
+      sqlBuilder.append(SPACE).append(dataType).append(SPACE);
+    }
+
+    // Add DEFAULT value if specified
+    if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) {
+      sqlBuilder
+          .append("DEFAULT ")
+          
.append(columnDefaultValueConverter.fromGravitino(column.defaultValue()))
+          .append(SPACE);
+    }
+
+    // Add column comment if specified
+    if (StringUtils.isNotEmpty(column.comment())) {
+      sqlBuilder.append("COMMENT '").append(column.comment()).append("' ");

Review Comment:
   SQL injection vulnerability: Column comments containing single quotes are 
not escaped before being embedded in SQL. If a user provides a column comment 
with a single quote (e.g., "user's column"), the generated SQL will be 
malformed and could lead to SQL injection. Use proper SQL escaping (e.g., 
replace single quotes with two single quotes).
   ```suggestion
         String escapedComment = StringUtils.replace(column.comment(), "'", 
"''");
         sqlBuilder.append("COMMENT '").append(escapedComment).append("' ");
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,13 +109,189 @@ protected String generateCreateTableSql(
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    validateIncrementCol(columns, indexes);
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder
+        .append("CREATE TABLE ")
+        .append(BACK_QUOTE)
+        .append(tableName)
+        .append(BACK_QUOTE)
+        .append(" (\n");
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder
+          .append(SPACE)
+          .append(SPACE)
+          .append(BACK_QUOTE)
+          .append(column.name())
+          .append(BACK_QUOTE);
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    String engine = ENGINE_PROPERTY_ENTRY.getDefaultValue().getValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = userSetEngine;
+      }
+    }
+    sqlBuilder.append("\n ENGINE = ").append(engine);
+
+    // Omit partition by clause as it will be supported in the next PR
+    // TODO: Add partition by clause support
+
+    // Add order by clause
+    if (ArrayUtils.isNotEmpty(sortOrders)) {
+      if (sortOrders.length > 1) {
+        throw new UnsupportedOperationException(
+            "Currently ClickHouse does not support sortOrders with more than 1 
element");
+      } else if (sortOrders[0].nullOrdering() != null || 
sortOrders[0].direction() != null) {
+        // If no value is set earlier, some default values will be set.
+        // It is difficult to determine whether the user has set a value.
+        LOG.warn(
+            "ClickHouse currently does not support nullOrdering: {} and 
direction: {} of sortOrders,and will ignore them",
+            sortOrders[0].nullOrdering(),
+            sortOrders[0].direction());
+      }
+      sqlBuilder.append(
+          " \n ORDER BY " + BACK_QUOTE + sortOrders[0].expression() + 
BACK_QUOTE + " \n");

Review Comment:
   SQL injection vulnerability: The sortOrders expression is directly embedded 
into the ORDER BY clause without validation or escaping. While 
sortOrders[0].expression() returns a NamedReference object which will be 
converted to a string, there's no validation that this is safe for SQL 
injection. The backticks provide some protection but not complete. Consider 
validating that the expression is a simple column name or properly escaping it.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testCreateAndLoadTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(true)
+            .withComment("date")
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_1"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.ShortType.get())
+            .withNullable(true)
+            .build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(INT).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_6")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_7")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_9")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_10").withType(Types.DecimalType.of(10, 
2)).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(STRING).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_12")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_13")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_15")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        Collections.emptyMap(),
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        load);
+  }
+
+  @Test
+  public void testCreateNotSupportTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    List<Type> notSupportType =
+        Arrays.asList(
+            Types.FixedType.of(10),
+            Types.IntervalDayType.get(),
+            Types.IntervalYearType.get(),
+            Types.ListType.of(Types.DateType.get(), true),
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+            Types.UnionType.of(Types.IntegerType.get()),
+            Types.StructType.of(
+                Types.StructType.Field.notNullField("col_1", 
Types.IntegerType.get())));
+
+    for (Type type : notSupportType) {
+      columns.clear();
+      columns.add(
+          
JdbcColumn.builder().withName("col_1").withType(type).withNullable(false).build());
+
+      JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]);
+      Map<String, String> emptyMap = Collections.emptyMap();
+      IllegalArgumentException illegalArgumentException =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> {
+                TABLE_OPERATIONS.create(
+                    TEST_DB_NAME.toString(),
+                    tableName,
+                    jdbcCols,
+                    tableComment,
+                    emptyMap,
+                    null,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES,
+                    getSortOrders("col_1"));
+              });
+      Assertions.assertTrue(
+          illegalArgumentException
+              .getMessage()
+              .contains(
+                  String.format(
+                      "Couldn't convert Gravitino type %s to ClickHouse type",
+                      type.simpleString())));
+    }
+  }
+
+  @Test
+  public void testCreateMultipleTables() {
+    String test_table_1 = "test_table_1";
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        test_table_1,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    String testDb = "test_db_2";
+
+    DATABASE_OPERATIONS.create(testDb, null, null);
+    List<String> tables = TABLE_OPERATIONS.listTables(testDb);
+    Assertions.assertFalse(tables.contains(test_table_1));
+
+    String test_table_2 = "test_table_2";

Review Comment:
   Variable naming convention violation: Java variable names should use 
camelCase, not snake_case. Rename test_table_2 to testTable2 following Java 
naming conventions.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java:
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+
+  @Test
+  public void testOperationTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_op_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(STRING)
+            .withComment("test_comment")
+            .withNullable(true)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(INT)
+            .withNullable(false)
+            .withComment("set primary key")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(INT)
+            .withNullable(true)
+            .withDefaultValue(Literals.NULL)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(STRING)
+            .withDefaultValue(Literals.of("hello world", STRING))
+            .withNullable(false)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_2"));
+
+    // list table
+    List<String> tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertTrue(tables.contains(tableName));
+
+    // load table
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+  }
+
+  @Test
+  public void testCreateAndLoadTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(true)
+            .withComment("date")
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_1"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.ShortType.get())
+            .withNullable(true)
+            .build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(INT).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_6")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_7")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_9")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_10").withType(Types.DecimalType.of(10, 
2)).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(STRING).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_12")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_13")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_15")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        Collections.emptyMap(),
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        load);
+  }
+
+  @Test
+  public void testCreateNotSupportTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    List<Type> notSupportType =
+        Arrays.asList(
+            Types.FixedType.of(10),
+            Types.IntervalDayType.get(),
+            Types.IntervalYearType.get(),
+            Types.ListType.of(Types.DateType.get(), true),
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+            Types.UnionType.of(Types.IntegerType.get()),
+            Types.StructType.of(
+                Types.StructType.Field.notNullField("col_1", 
Types.IntegerType.get())));
+
+    for (Type type : notSupportType) {
+      columns.clear();
+      columns.add(
+          
JdbcColumn.builder().withName("col_1").withType(type).withNullable(false).build());
+
+      JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]);
+      Map<String, String> emptyMap = Collections.emptyMap();
+      IllegalArgumentException illegalArgumentException =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> {
+                TABLE_OPERATIONS.create(
+                    TEST_DB_NAME.toString(),
+                    tableName,
+                    jdbcCols,
+                    tableComment,
+                    emptyMap,
+                    null,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES,
+                    getSortOrders("col_1"));
+              });
+      Assertions.assertTrue(
+          illegalArgumentException
+              .getMessage()
+              .contains(
+                  String.format(
+                      "Couldn't convert Gravitino type %s to ClickHouse type",
+                      type.simpleString())));
+    }
+  }
+
+  @Test
+  public void testCreateMultipleTables() {
+    String test_table_1 = "test_table_1";

Review Comment:
   Variable naming convention violation: Java variable names should use 
camelCase, not snake_case. Rename test_table_1 to testTable1 following Java 
naming conventions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to