This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 1249473750 [#9755] feat(clickhouse): support clickhouse table 
operations (#9776)
1249473750 is described below

commit 1249473750bd10aacafdb6bc80198bb1cf173ac2
Author: Qi Yu <[email protected]>
AuthorDate: Thu Feb 5 10:55:09 2026 +0800

    [#9755] feat(clickhouse): support clickhouse table operations (#9776)
    
    ### What changes were proposed in this pull request?
    
    Add clickhouse create/drop/load/list operations and some tests. Table
    alternation will be
    
    ### Why are the changes needed?
    
    Fix: #9755
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    UTs
---
 .../org/apache/gravitino/rel/indexes/Indexes.java  |  15 +-
 .../catalog-jdbc-clickhouse/build.gradle.kts       |   3 +
 .../catalog/clickhouse/ClickHouseConfig.java       |  12 +-
 .../catalog/clickhouse/ClickHouseConstants.java    |  33 +-
 .../ClickHouseTablePropertiesMetadata.java         | 233 ++++++++
 .../ClickHouseColumnDefaultValueConverter.java     | 166 +++++-
 .../converter/ClickHouseTypeConverter.java         | 193 ++++++-
 .../catalog/clickhouse/converter/TypeUtils.java    |  39 ++
 .../operations/ClickHouseTableOperations.java      | 306 +++++++++-
 .../catalog/clickhouse/ClickHouseUtils.java        |  30 +
 .../converter/TestClickHouseTypeConverter.java     | 156 +++++
 .../clickhouse/operations/TestClickHouse.java      |  75 +++
 .../TestClickHouseCatalogOperations.java           |  36 ++
 .../operations/TestClickHouseTableOperations.java  | 636 +++++++++++++++++++++
 .../operation/OceanBaseTableOperations.java        |   5 +-
 .../integration/test/CatalogOceanBaseIT.java       |   4 +-
 .../operation/TestOceanBaseTableOperations.java    |   2 +-
 .../converter/JdbcColumnDefaultValueConverter.java |  13 +-
 .../jdbc/operation/JdbcTableOperations.java        |  56 +-
 .../catalog/jdbc/operation/TableOperation.java     |  27 +
 .../mysql/operation/MysqlTableOperations.java      |   5 +-
 .../mysql/integration/test/CatalogMysqlIT.java     |   4 +-
 .../mysql/operation/TestMysqlTableOperations.java  |   2 +-
 .../dto/requests/TestTableUpdatesRequest.java      |   2 +-
 gradle/libs.versions.toml                          |   5 +
 .../test/container/ClickHouseContainer.java        | 158 +++++
 .../integration/test/container/ContainerSuite.java |  35 +-
 .../integration/test/util/TestDatabaseName.java    |   4 +
 .../server/web/rest/TestTableOperations.java       |   2 +-
 29 files changed, 2182 insertions(+), 75 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java 
b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
index c6303e7938..16ea392109 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
@@ -27,8 +27,11 @@ public class Indexes {
   /** An empty array of indexes. */
   public static final Index[] EMPTY_INDEXES = new Index[0];
 
-  /** MySQL does not support setting the name of the primary key, so the 
default name is used. */
-  public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";
+  /**
+   * Name of the default primary key. MySQL, ClickHouse, OceanBase and many 
other databases supports
+   * setting the name of the primary key and use it as primary key name.
+   */
+  public static final String DEFAULT_PRIMARY_KEY_NAME = "PRIMARY";
 
   /**
    * Create a unique index on columns. Like unique (a) or unique (a, b), for 
complex like unique
@@ -48,7 +51,7 @@ public class Indexes {
    * @return The primary key index
    */
   public static Index createMysqlPrimaryKey(String[][] fieldNames) {
-    return primary(DEFAULT_MYSQL_PRIMARY_KEY_NAME, fieldNames);
+    return primary(DEFAULT_PRIMARY_KEY_NAME, fieldNames);
   }
 
   /**
@@ -165,7 +168,7 @@ public class Indexes {
        * @param indexType The type of the index
        * @return The builder for creating a new instance of IndexImpl.
        */
-      public Indexes.IndexImpl.Builder withIndexType(IndexType indexType) {
+      public Builder withIndexType(IndexType indexType) {
         this.indexType = indexType;
         return this;
       }
@@ -176,7 +179,7 @@ public class Indexes {
        * @param name The name of the index
        * @return The builder for creating a new instance of IndexImpl.
        */
-      public Indexes.IndexImpl.Builder withName(String name) {
+      public Builder withName(String name) {
         this.name = name;
         return this;
       }
@@ -187,7 +190,7 @@ public class Indexes {
        * @param fieldNames The field names of the index
        * @return The builder for creating a new instance of IndexImpl.
        */
-      public Indexes.IndexImpl.Builder withFieldNames(String[][] fieldNames) {
+      public Builder withFieldNames(String[][] fieldNames) {
         this.fieldNames = fieldNames;
         return this;
       }
diff --git a/catalogs-contrib/catalog-jdbc-clickhouse/build.gradle.kts 
b/catalogs-contrib/catalog-jdbc-clickhouse/build.gradle.kts
index 94de7b8b15..a42652835a 100644
--- a/catalogs-contrib/catalog-jdbc-clickhouse/build.gradle.kts
+++ b/catalogs-contrib/catalog-jdbc-clickhouse/build.gradle.kts
@@ -50,9 +50,12 @@ dependencies {
   testImplementation(project(":server-common"))
 
   testImplementation(libs.awaitility)
+  testImplementation(libs.clickhouse.driver)
   testImplementation(libs.junit.jupiter.api)
   testImplementation(libs.junit.jupiter.params)
+  testImplementation(libs.lz4.java)
   testImplementation(libs.testcontainers)
+  testImplementation(libs.testcontainers.clickhouse)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
index 83e99fc0ca..4f456775b7 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
@@ -19,11 +19,9 @@
 
 package org.apache.gravitino.catalog.clickhouse;
 
-import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_NAME;
-import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_SHARDING_KEY;
-import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ON_CLUSTER;
-
 import org.apache.commons.lang3.StringUtils;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.DistributedTableConstants;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
@@ -34,7 +32,7 @@ public class ClickHouseConfig {
 
   // Constants part
   public static final ConfigEntry<String> CK_CLUSTER_NAME =
-      new ConfigBuilder(CLUSTER_NAME)
+      new ConfigBuilder(ClusterConstants.NAME)
           .doc("Cluster name for ClickHouse distributed tables")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
@@ -42,14 +40,14 @@ public class ClickHouseConfig {
           .create();
 
   public static final ConfigEntry<Boolean> CK_ON_CLUSTER =
-      new ConfigBuilder(ON_CLUSTER)
+      new ConfigBuilder(ClusterConstants.ON_CLUSTER)
           .doc("Whether to use 'ON CLUSTER' clause when creating tables in 
ClickHouse")
           .version(ConfigConstants.VERSION_1_2_0)
           .booleanConf()
           .createWithDefault(DEFAULT_CK_ON_CLUSTER);
 
   public static final ConfigEntry<String> CK_CLUSTER_SHARDING_KEY =
-      new ConfigBuilder(CLUSTER_SHARDING_KEY)
+      new ConfigBuilder(DistributedTableConstants.SHARDING_KEY)
           .doc("Sharding key for ClickHouse distributed tables")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
index fab6db164b..19b85a0dd0 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
@@ -21,10 +21,31 @@ package org.apache.gravitino.catalog.clickhouse;
 
 public class ClickHouseConstants {
 
-  // Name of the clickhouse cluster
-  public static final String CLUSTER_NAME = "cluster-name";
-  // Whether to use 'ON CLUSTER' clause when creating tables
-  public static final String ON_CLUSTER = "on-cluster";
-  // Sharding key for the clickhouse cluster
-  public static final String CLUSTER_SHARDING_KEY = "cluster-sharding-key";
+  /** Constants for tables with the distributed engine. */
+  public static final class DistributedTableConstants {
+    private DistributedTableConstants() {}
+    // Sharding key for the clickhouse cluster
+    public static final String SHARDING_KEY = "cluster-sharding-key";
+    public static final String REMOTE_DATABASE = "cluster-remote-database";
+    public static final String REMOTE_TABLE = "cluster-remote-table";
+  }
+
+  /** Constants for cluster tables. */
+  public static final class ClusterConstants {
+    private ClusterConstants() {}
+
+    // Name of the clickhouse cluster
+    public static final String NAME = "cluster-name";
+    // Whether to use 'ON CLUSTER' clause when creating tables
+    public static final String ON_CLUSTER = "on-cluster";
+  }
+
+  /** Table-scoped properties. */
+  public static final class TableConstants {
+    private TableConstants() {}
+
+    public static final String ENGINE = "engine";
+    public static final String ENGINE_UPPER = "ENGINE";
+    public static final String SETTINGS_PREFIX = "settings.";
+  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
new file mode 100644
index 0000000000..f7330c6952
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringReservedPropertyEntry;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.collections4.BidiMap;
+import org.apache.commons.collections4.bidimap.TreeBidiMap;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.DistributedTableConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.TableConstants;
+import org.apache.gravitino.catalog.jdbc.JdbcTablePropertiesMetadata;
+import org.apache.gravitino.connector.PropertyEntry;
+
+public class ClickHouseTablePropertiesMetadata extends 
JdbcTablePropertiesMetadata {
+  public static final String GRAVITINO_ENGINE_KEY = TableConstants.ENGINE;
+  public static final String CLICKHOUSE_ENGINE_KEY = 
TableConstants.ENGINE_UPPER;
+
+  // The following two properties are mapped to ClickHouse table properties 
and can be used by
+  // tables with different engines.
+  public static final PropertyEntry<String> COMMENT_PROPERTY_ENTRY =
+      stringReservedPropertyEntry(COMMENT_KEY, "The table comment", true);
+  public static final PropertyEntry<ENGINE> ENGINE_PROPERTY_ENTRY =
+      enumImmutablePropertyEntry(
+          GRAVITINO_ENGINE_KEY,
+          "The table engine",
+          false,
+          ENGINE.class,
+          ENGINE.MERGETREE,
+          false,
+          false);
+
+  // The following two are for cluster tables
+  public static final PropertyEntry<String> CLUSTER_NAME_PROPERTY_ENTRY =
+      stringOptionalPropertyEntry(
+          ClusterConstants.NAME,
+          "The cluster name for DDL operations, for example, creating on 
cluster tables",
+          false,
+          "",
+          false);
+  public static final PropertyEntry<String> ON_CLUSTER_PROPERTY_ENTRY =
+      stringOptionalPropertyEntry(
+          ClusterConstants.ON_CLUSTER,
+          "Whether to use 'ON CLUSTER' clause when creating tables in 
ClickHouse",
+          false,
+          "",
+          false);
+
+  // The following three are for ClickHouse Distributed engine
+  public static final PropertyEntry<String> 
CLUSTER_REMOTE_DATABASE_PROPERTY_ENTRY =
+      stringOptionalPropertyEntry(
+          DistributedTableConstants.REMOTE_DATABASE,
+          "The remote database name for ClickHouse distributed tables",
+          false,
+          "",
+          false);
+  public static final PropertyEntry<String> 
CLUSTER_REMOTE_TABLE_PROPERTY_ENTRY =
+      stringOptionalPropertyEntry(
+          DistributedTableConstants.REMOTE_TABLE,
+          "The remote table name for ClickHouse distributed tables",
+          false,
+          "",
+          false);
+  public static final PropertyEntry<String> 
CLUSTER_SHARDING_KEY_PROPERTY_ENTRY =
+      stringOptionalPropertyEntry(
+          DistributedTableConstants.SHARDING_KEY,
+          "The sharding key for ClickHouse distributed tables",
+          false,
+          "",
+          false);
+
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      createPropertiesMetadata();
+
+  public static final BidiMap<String, String> GRAVITINO_CONFIG_TO_CLICKHOUSE =
+      createGravitinoConfigToClickhouse();
+
+  private static BidiMap<String, String> createGravitinoConfigToClickhouse() {
+    BidiMap<String, String> map = new TreeBidiMap<>();
+    map.put(GRAVITINO_ENGINE_KEY, CLICKHOUSE_ENGINE_KEY);
+    return map;
+  }
+
+  private static Map<String, PropertyEntry<?>> createPropertiesMetadata() {
+    Map<String, PropertyEntry<?>> map = new HashMap<>();
+    // For all tables with different engines
+    map.put(COMMENT_PROPERTY_ENTRY.getName(), COMMENT_PROPERTY_ENTRY);
+    map.put(ENGINE_PROPERTY_ENTRY.getName(), ENGINE_PROPERTY_ENTRY);
+    // For ClickHouse Distributed engine
+    map.put(ON_CLUSTER_PROPERTY_ENTRY.getName(), ON_CLUSTER_PROPERTY_ENTRY);
+    map.put(CLUSTER_NAME_PROPERTY_ENTRY.getName(), 
CLUSTER_NAME_PROPERTY_ENTRY);
+    map.put(
+        CLUSTER_REMOTE_DATABASE_PROPERTY_ENTRY.getName(), 
CLUSTER_REMOTE_DATABASE_PROPERTY_ENTRY);
+    map.put(CLUSTER_REMOTE_TABLE_PROPERTY_ENTRY.getName(), 
CLUSTER_REMOTE_TABLE_PROPERTY_ENTRY);
+    map.put(CLUSTER_SHARDING_KEY_PROPERTY_ENTRY.getName(), 
CLUSTER_SHARDING_KEY_PROPERTY_ENTRY);
+
+    return Collections.unmodifiableMap(map);
+  }
+
+  /** refer https://clickhouse.com/docs/en/engines/table-engines */
+  public enum ENGINE {
+    // MergeTree
+    MERGETREE("MergeTree", true),
+    REPLACINGMERGETREE("ReplacingMergeTree", true),
+    SUMMINGMERGETREE("SummingMergeTree", true),
+    AGGREGATINGMERGETREE("AggregatingMergeTree", true),
+    COLLAPSINGMERGETREE("CollapsingMergeTree", true),
+    VERSIONEDCOLLAPSINGMERGETREE("VersionedCollapsingMergeTree", true),
+    GRAPHITEMERGETREE("GraphiteMergeTree"),
+
+    // Log
+    TINYLOG("TinyLog"),
+    STRIPELOG("StripeLog"),
+    LOG("Log"),
+
+    // Integration Engines
+    ODBC("ODBC"),
+    JDBC("JDBC"),
+    MySQL("MySQL"),
+    MONGODB("MongoDB"),
+    Redis("Redis"),
+    HDFS("HDFS"),
+    S3("S3"),
+    KAFKA("Kafka"),
+    EMBEDDEDROCKSDB("EmbeddedRocksDB"),
+    RABBITMQ("RabbitMQ"),
+    POSTGRESQL("PostgreSQL"),
+    S3QUEUE("S3Queue"),
+    TIMESERIES("TimeSeries"),
+
+    // Special Engines
+    DISTRIBUTED("Distributed"),
+    DICTIONARY("Dictionary"),
+    MERGE("Merge"),
+    FILE("File"),
+    NULL("Null"),
+    SET("Set"),
+    JOIN("Join"),
+    URL("URL"),
+    VIEW("View"),
+    MEMORY("Memory"),
+    BUFFER("Buffer"),
+    KEEPER_MAP("KeeperMap");
+
+    private final String value;
+    private final boolean requireOrderBy;
+
+    ENGINE(String value) {
+      this.value = value;
+      this.requireOrderBy = false;
+    }
+
+    ENGINE(String value, boolean requireOrderBy) {
+      this.value = value;
+      this.requireOrderBy = requireOrderBy;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public boolean isRequireOrderBy() {
+      return requireOrderBy;
+    }
+
+    public static ENGINE fromString(String engineText) {
+      for (ENGINE engine : ENGINE.values()) {
+        if (engine.value.equalsIgnoreCase(engineText)) {
+          return engine;
+        }
+      }
+      throw new IllegalArgumentException("Unknown ClickHouse engine: " + 
engineText);
+    }
+  }
+
+  @Override
+  protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
+    return PROPERTIES_METADATA;
+  }
+
+  @Override
+  public Map<String, String> transformToJdbcProperties(Map<String, String> 
properties) {
+    return Collections.unmodifiableMap(
+        new HashMap<String, String>() {
+          {
+            properties.forEach(
+                (key, value) -> {
+                  if (GRAVITINO_CONFIG_TO_CLICKHOUSE.containsKey(key)) {
+                    put(GRAVITINO_CONFIG_TO_CLICKHOUSE.get(key), value);
+                  }
+                });
+          }
+        });
+  }
+
+  @Override
+  public Map<String, String> convertFromJdbcProperties(Map<String, String> 
properties) {
+    BidiMap<String, String> clickhouseConfigToGravitino =
+        GRAVITINO_CONFIG_TO_CLICKHOUSE.inverseBidiMap();
+    return Collections.unmodifiableMap(
+        new HashMap<String, String>() {
+          {
+            properties.forEach(
+                (key, value) -> {
+                  if (clickhouseConfigToGravitino.containsKey(key)) {
+                    put(clickhouseConfigToGravitino.get(key), value);
+                  }
+                });
+          }
+        });
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java
index 121f40cca3..236b498172 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java
@@ -1,25 +1,161 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 public class ClickHouseColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
-  // Implement ClickHouse specific default value conversions if needed
+
+  protected static final String NOW = "now";
+  protected static final Expression DEFAULT_VALUE_OF_NOW = 
FunctionExpression.of(NOW);
+
+  public String fromGravitino(Expression defaultValue) {
+    if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
+      return null;
+    }
+
+    if (defaultValue instanceof FunctionExpression functionExpression) {
+      return String.format("(%s)", functionExpression);
+    }
+
+    if (defaultValue instanceof Literal<?> literal) {
+      Type type = literal.dataType();
+      if (defaultValue.equals(Literals.NULL)) {
+        return NULL;
+      } else if (type instanceof Type.NumericType) {
+        return literal.value().toString();
+      } else {
+        Object value = literal.value();
+        if (value instanceof LocalDateTime) {
+          value = ((LocalDateTime) value).format(DATE_TIME_FORMATTER);
+        }
+        return String.format("'%s'", value);
+      }
+    }
+
+    throw new IllegalArgumentException("Not a supported column default value: 
" + defaultValue);
+  }
+
+  @Override
+  public Expression toGravitino(
+      JdbcTypeConverter.JdbcTypeBean type,
+      String columnDefaultValue,
+      boolean isExpression,
+      boolean nullable) {
+    if (columnDefaultValue == null || columnDefaultValue.isEmpty()) {
+      return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+    }
+
+    // Handle Nullable type Nullable(type) -> type
+    String realType = TypeUtils.stripNullable(type.getTypeName());
+
+    if (realType.startsWith("Decimal(")) {
+      realType = "Decimal";
+    }
+
+    if (realType.startsWith("FixedString(")) {
+      realType = "FixedString";
+    }
+
+    if (nullable) {
+      if (columnDefaultValue.equals("NULL")) {
+        return Literals.NULL;
+      }
+    }
+
+    // TODO clickhouse has bug which isExpression is false when is really 
expression
+    if (isExpression) {
+      if (columnDefaultValue.equals(NOW)) {
+        return DEFAULT_VALUE_OF_NOW;
+      }
+      // The parsing of ClickHouse expressions is complex, so we are not 
currently undertaking the
+      // parsing.
+      return UnparsedExpression.of(columnDefaultValue);
+    }
+
+    // need exclude begin and end "'"
+    String reallyValue =
+        columnDefaultValue.startsWith("'")
+            ? columnDefaultValue.substring(1, columnDefaultValue.length() - 1)
+            : columnDefaultValue;
+
+    try {
+      switch (realType) {
+        case ClickHouseTypeConverter.INT8:
+          return Literals.byteLiteral(Byte.valueOf(reallyValue));
+        case ClickHouseTypeConverter.UINT8:
+          return Literals.unsignedByteLiteral(Short.valueOf(reallyValue));
+        case ClickHouseTypeConverter.INT16:
+          return Literals.shortLiteral(Short.valueOf(reallyValue));
+        case ClickHouseTypeConverter.UINT16:
+          return Literals.unsignedShortLiteral(Integer.valueOf(reallyValue));
+        case ClickHouseTypeConverter.INT32:
+          return Literals.integerLiteral(Integer.valueOf(reallyValue));
+        case ClickHouseTypeConverter.UINT32:
+          return Literals.unsignedIntegerLiteral(Long.valueOf(reallyValue));
+        case ClickHouseTypeConverter.INT64:
+          return Literals.longLiteral(Long.valueOf(reallyValue));
+        case ClickHouseTypeConverter.UINT64:
+          return Literals.unsignedLongLiteral(Decimal.of(reallyValue));
+        case ClickHouseTypeConverter.FLOAT32:
+          return Literals.floatLiteral(Float.valueOf(reallyValue));
+        case ClickHouseTypeConverter.FLOAT64:
+          return Literals.doubleLiteral(Double.valueOf(reallyValue));
+        case ClickHouseTypeConverter.DECIMAL:
+          if (reallyValue.equals("0.")) {
+            reallyValue = "0.0";
+          }
+          return Literals.decimalLiteral(
+              Decimal.of(reallyValue, type.getColumnSize(), type.getScale()));
+        case ClickHouseTypeConverter.DATE:
+          if (StringUtils.isBlank(realType)) {
+            throw new IllegalArgumentException("Can't convert blank default 
value for DATE type.");
+          }
+          return Literals.dateLiteral(LocalDate.parse(reallyValue, 
DATE_FORMATTER));
+        case ClickHouseTypeConverter.DATETIME:
+          return CURRENT_TIMESTAMP.equals(reallyValue)
+              ? DEFAULT_VALUE_OF_CURRENT_TIMESTAMP
+              : Literals.timestampLiteral(LocalDateTime.parse(reallyValue, 
DATE_TIME_FORMATTER));
+        case ClickHouseTypeConverter.STRING:
+          return Literals.of(reallyValue, Types.StringType.get());
+        case ClickHouseTypeConverter.FIXEDSTRING:
+          return Literals.of(reallyValue, 
Types.FixedCharType.of(type.getColumnSize()));
+        default:
+          return UnparsedExpression.of(reallyValue);
+      }
+    } catch (Exception ex) {
+      return UnparsedExpression.of(reallyValue);
+    }
+  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseTypeConverter.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseTypeConverter.java
index 26196685df..14acbfa84c 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseTypeConverter.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseTypeConverter.java
@@ -1,37 +1,190 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.gravitino.catalog.clickhouse.converter;
 
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
+/** Type converter for ClickHouse. */
 public class ClickHouseTypeConverter extends JdbcTypeConverter {
 
+  static final String INT8 = "Int8";
+  static final String INT16 = "Int16";
+  static final String INT32 = "Int32";
+  static final String INT64 = "Int64";
+  static final String INT128 = "Int128";
+  static final String INT256 = "Int256";
+  static final String UINT8 = "UInt8";
+  static final String UINT16 = "UInt16";
+  static final String UINT32 = "UInt32";
+  static final String UINT64 = "UInt64";
+  static final String UINT128 = "UInt128";
+  static final String UINT256 = "UInt256";
+
+  static final String FLOAT32 = "Float32";
+  static final String FLOAT64 = "Float64";
+  static final String BFLOAT16 = "BFloat16";
+  static final String DECIMAL = "Decimal";
+  static final String STRING = "String";
+  static final String FIXEDSTRING = "FixedString";
+  static final String DATE = "Date";
+  static final String DATE32 = "Date32";
+  // DataTime with detail time zone is not directly supported in Gravitino.
+  static final String DATETIME = "DateTime";
+  static final String DATETIME64 = "DateTime64";
+  static final String ENUM = "Enum";
+  static final String BOOL = "Bool";
+  static final String UUID = "UUID";
+
+  // bellow is Object Data Type
+  static final String IPV4 = "IPv4";
+  static final String IPV6 = "IPv6";
+  static final String ARRAY = "Array";
+  static final String TUPLE = "Tuple";
+  static final String MAP = "Map";
+  static final String VARIANT = "Variant";
+  static final String LOWCARDINALITY = "LowCardinality";
+  static final String NULLABLE = "Nullable";
+  static final String AGGREGATEFUNCTION = "AggregateFunction";
+  static final String SIMPLEAGGREGATEFUNCTION = "SimpleAggregateFunction";
+  static final String GEO = "Geo";
+
+  // bellow is Special Data Types
+  static final String Domains = "Domains";
+  static final String Nested = "Nested";
+  static final String Dynamic = "Dynamic";
+  static final String JSON = "JSON";
+
+  private static final int MAX_PRECISION = 76;
+
   @Override
-  public String fromGravitino(Type type) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTypeConverter.fromGravitino is not implemented yet.");
+  public Type toGravitino(JdbcTypeBean typeBean) {
+    String typeName = TypeUtils.stripNullable(typeBean.getTypeName());
+
+    Integer dateTimePrecision = TypeUtils.extractDateTimePrecision(typeName);
+    if (dateTimePrecision != null) {
+      return Types.TimestampType.withoutTimeZone(dateTimePrecision);
+    }
+
+    if (typeName.startsWith("Decimal(")) {
+      typeName = "Decimal";
+    }
+
+    if (typeName.startsWith("FixedString(")) {
+      typeName = "FixedString";
+    }
+
+    switch (typeName) {
+      case INT8:
+        return Types.ByteType.get();
+      case INT16:
+        return Types.ShortType.get();
+      case INT32:
+        return Types.IntegerType.get();
+      case INT64:
+        return Types.LongType.get();
+      case UINT8:
+        return Types.ByteType.unsigned();
+      case UINT16:
+        return Types.ShortType.unsigned();
+      case UINT32:
+        return Types.IntegerType.unsigned();
+      case UINT64:
+        return Types.LongType.unsigned();
+      case FLOAT32:
+        return Types.FloatType.get();
+      case FLOAT64:
+        return Types.DoubleType.get();
+      case DECIMAL:
+        int precision = typeBean.getColumnSize();
+        int scale = typeBean.getScale();
+        if (precision < 1 || precision > MAX_PRECISION) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Decimal precision %s is out of range [1, %s]", precision, 
MAX_PRECISION));
+        }
+
+        if (scale > precision || scale < 0) {
+          throw new IllegalArgumentException(
+              String.format("Decimal scale %s is out of range [0, %s]", scale, 
precision));
+        }
+
+        return Types.DecimalType.of(precision, scale);
+      case STRING:
+        return Types.StringType.get();
+      case FIXEDSTRING:
+        return Types.FixedCharType.of(typeBean.getColumnSize());
+      case DATE:
+        return Types.DateType.get();
+        // No type mapping for date32, we will use external type to handle it.
+      case DATETIME:
+        // Default is 0 precision
+        return Types.TimestampType.withoutTimeZone(0);
+      case BOOL:
+        return Types.BooleanType.get();
+      case UUID:
+        return Types.UUIDType.get();
+      default:
+        return Types.ExternalType.of(typeBean.getTypeName());
+    }
   }
 
   @Override
-  public Type toGravitino(JdbcTypeBean jdbcTypeBean) {
-    throw new UnsupportedOperationException(
-        "ClickHouseTypeConverter.toGravitino is not implemented yet.");
+  public String fromGravitino(Type type) {
+    if (type instanceof Types.ByteType byteType) {
+      return byteType.signed() ? INT8 : UINT8;
+    } else if (type instanceof Types.ShortType shortType) {
+      return shortType.signed() ? INT16 : UINT16;
+    } else if (type instanceof Types.IntegerType integerType) {
+      return integerType.signed() ? INT32 : UINT32;
+    } else if (type instanceof Types.LongType longType) {
+      return longType.signed() ? INT64 : UINT64;
+    } else if (type instanceof Types.FloatType) {
+      return FLOAT32;
+    } else if (type instanceof Types.DoubleType) {
+      return FLOAT64;
+    } else if (type instanceof Types.StringType) {
+      return STRING;
+    } else if (type instanceof Types.DateType) {
+      return DATE;
+    } else if (type instanceof Types.TimestampType) {
+      // Gravitino timestamp type maps to ClickHouse DateTime with precision 
0, and
+      // Use the external type to handle DateTime64
+      return DATETIME;
+    } else if (type instanceof Types.TimeType) {
+      return TIME;
+    } else if (type instanceof Types.DecimalType decimalType) {
+      return String.format("%s(%s,%s)", DECIMAL, decimalType.precision(), 
decimalType.scale());
+    } else if (type instanceof Types.VarCharType) {
+      return STRING;
+    } else if (type instanceof Types.FixedCharType fixedCharType) {
+      return FIXEDSTRING + "(" + fixedCharType.length() + ")";
+    } else if (type instanceof Types.BooleanType) {
+      return BOOL;
+    } else if (type instanceof Types.UUIDType) {
+      return UUID;
+    } else if (type instanceof Types.ExternalType) {
+      return ((Types.ExternalType) type).catalogString();
+    }
+    throw new IllegalArgumentException(
+        String.format(
+            "Couldn't convert Gravitino type %s to ClickHouse type", 
type.simpleString()));
   }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/TypeUtils.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/TypeUtils.java
new file mode 100644
index 0000000000..8a9bb76bb4
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/TypeUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.converter;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TypeUtils {
+
+  private TypeUtils() {}
+
+  public static String stripNullable(String typeName) {
+    return typeName.replaceFirst("^Nullable\\((.*)\\)$", "$1");
+  }
+
+  public static Integer extractDateTimePrecision(String typeName) {
+    Matcher matcher = 
Pattern.compile("^DateTime\\((\\d+)\\)$").matcher(typeName);
+    if (matcher.matches()) {
+      return Integer.parseInt(matcher.group(1));
+    }
+    return null;
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
index 39e5a7c666..8356c84291 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
@@ -18,16 +18,85 @@
  */
 package org.apache.gravitino.catalog.clickhouse.operations;
 
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.base.Preconditions;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.TableConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.NullOrdering;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
 
 public class ClickHouseTableOperations extends JdbcTableOperations {
 
+  @SuppressWarnings("unused")
+  private static final String CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Clickhouse does not support nested column names.";
+
+  private static final String QUERY_INDEXES_SQL =
+      """
+      SELECT NULL AS TABLE_CAT,
+             system.tables.database AS TABLE_SCHEM,
+             system.tables.name AS TABLE_NAME,
+             trim(c.1) AS COLUMN_NAME,
+             c.2 AS KEY_SEQ,
+             'PRIMARY' AS PK_NAME
+      FROM system.tables
+      ARRAY JOIN arrayZip(splitByChar(',', primary_key), 
arrayEnumerate(splitByChar(',', primary_key))) as c
+      WHERE system.tables.primary_key <> ''
+        AND system.tables.database = '%s'
+        AND system.tables.name = '%s'
+      ORDER BY COLUMN_NAME
+      """;
+
+  @Override
+  protected List<Index> getIndexes(Connection connection, String databaseName, 
String tableName) {
+    // cause clickhouse not impl getPrimaryKeys yet, ref:
+    // https://github.com/ClickHouse/clickhouse-java/issues/1625
+    String sql =
+        QUERY_INDEXES_SQL.formatted(quoteIdentifier(databaseName), 
quoteIdentifier(tableName));
+    try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+        ResultSet resultSet = preparedStatement.executeQuery()) {
+
+      List<Index> indexes = new ArrayList<>();
+      while (resultSet.next()) {
+        String indexName = resultSet.getString("PK_NAME");
+        String columnName = resultSet.getString("COLUMN_NAME");
+        indexes.add(
+            Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][] 
{{columnName}}));
+      }
+      return indexes;
+    } catch (SQLException e) {
+      throw exceptionMapper.toGravitinoException(e);
+    }
+  }
+
   @Override
   protected String generateCreateTableSql(
       String tableName,
@@ -38,13 +107,221 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
       Distribution distribution,
       Index[] indexes) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generateCreateTableSql is not implemented 
yet.");
+        "generateCreateTableSql with out sortOrders in clickhouse is not 
supported");
+  }
+
+  @Override
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+
+    // These two are not yet supported in Gravitino now and will be supported 
in the future.
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Currently we do not support Partitioning in clickhouse");
+    }
+    Preconditions.checkArgument(
+        Distributions.NONE.equals(distribution), "ClickHouse does not support 
distribution");
+
+    // First build the CREATE TABLE statement
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append("CREATE TABLE %s 
(\n".formatted(quoteIdentifier(tableName)));
+
+    // Add columns
+    buildColumnsDefinition(columns, sqlBuilder);
+
+    // Index definition, we only support primary index now, secondary index 
will be supported in
+    // the future
+    appendIndexesSql(indexes, sqlBuilder);
+
+    sqlBuilder.append("\n)");
+
+    // Extract engine from properties
+    ClickHouseTablePropertiesMetadata.ENGINE engine = 
appendTableEngine(properties, sqlBuilder);
+
+    appendOrderBy(sortOrders, sqlBuilder, engine);
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+    }
+
+    // Add setting clause if specified, clickhouse only supports predefine 
settings
+    appendTableProperties(properties, sqlBuilder);
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  private static void appendTableProperties(
+      Map<String, String> properties, StringBuilder sqlBuilder) {
+    if (MapUtils.isEmpty(properties)) {
+      return;
+    }
+
+    String settings =
+        properties.entrySet().stream()
+            .filter(entry -> 
entry.getKey().startsWith(TableConstants.SETTINGS_PREFIX))
+            .map(
+                entry ->
+                    
entry.getKey().substring(TableConstants.SETTINGS_PREFIX.length())
+                        + " = "
+                        + entry.getValue())
+            .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+    sqlBuilder.append(settings);
+  }
+
+  private static void appendOrderBy(
+      SortOrder[] sortOrders,
+      StringBuilder sqlBuilder,
+      ClickHouseTablePropertiesMetadata.ENGINE engine) {
+    // ClickHouse requires ORDER BY clause for some engines, and currently 
only mergeTree family
+    // requires ORDER BY clause.
+    boolean requireOrderBy = engine.isRequireOrderBy();
+    if (!requireOrderBy) {
+      if (ArrayUtils.isNotEmpty(sortOrders)) {
+        throw new UnsupportedOperationException(
+            "ORDER BY clause is not supported for engine: " + 
engine.getValue());
+      }
+
+      // No need to add order by clause
+      return;
+    }
+
+    if (ArrayUtils.isEmpty(sortOrders)) {
+      throw new IllegalArgumentException(
+          "ORDER BY clause is required for engine: " + engine.getValue());
+    }
+
+    if (sortOrders.length > 1) {
+      throw new UnsupportedOperationException(
+          "Currently ClickHouse does not support sortOrders with more than 1 
element");
+    }
+
+    NullOrdering nullOrdering = sortOrders[0].nullOrdering();
+    SortDirection sortDirection = sortOrders[0].direction();
+    if (nullOrdering != null && sortDirection != null) {
+      // ClickHouse does not support NULLS FIRST/LAST now.
+      LOG.warn(
+          "ClickHouse currently does not support nullOrdering: {}, and will 
ignore it",
+          nullOrdering);
+    }
+
+    sqlBuilder.append("\n ORDER BY 
`%s`\n".formatted(sortOrders[0].expression()));
+  }
+
+  private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
+      Map<String, String> properties, StringBuilder sqlBuilder) {
+    ClickHouseTablePropertiesMetadata.ENGINE engine = 
ENGINE_PROPERTY_ENTRY.getDefaultValue();
+    if (MapUtils.isNotEmpty(properties)) {
+      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      if (StringUtils.isNotEmpty(userSetEngine)) {
+        engine = 
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
+      }
+    }
+    sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
+    return engine;
+  }
+
+  private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder 
sqlBuilder) {
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append("  %s".formatted(quoteIdentifier(column.name())));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(",\n");
+      }
+    }
+  }
+
+  /**
+   * ClickHouse only supports primary key now, and some secondary index will 
be supported in future
+   *
+   * <p>This method will not check the validity of the indexes. For 
clickhouse, the primary key must
+   * be a subset of the order by columns. We will leave the underlying 
clickhouse to validate it.
+   */
+  private void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    if (ArrayUtils.isEmpty(indexes)) {
+      return;
+    }
+
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(",\n");
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          if (null != index.name()
+              && !StringUtils.equalsIgnoreCase(index.name(), 
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
+            LOG.warn(
+                "Primary key name must be PRIMARY in ClickHouse, the name {} 
will be ignored.",
+                index.name());
+          }
+          // fieldStr already quoted in getIndexFieldStr
+          sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Gravitino Clickhouse doesn't support index : " + index.type());
+      }
+    }
+  }
+
+  @Override
+  protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+  }
+
+  @Override
+  protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
+      throws SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement("select * from system.tables where name = 
? ")) {
+      statement.setString(1, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          if (Objects.equals(name, tableName)) {
+            return Collections.unmodifiableMap(
+                new HashMap<String, String>() {
+                  {
+                    put(COMMENT, resultSet.getString(COMMENT));
+                    put(CLICKHOUSE_ENGINE_KEY, 
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+                  }
+                });
+          }
+        }
+
+        throw new NoSuchTableException(
+            "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+      }
+    }
+  }
+
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    final DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+    // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+    return metaData.getTables(catalogName, schemaName, null, new String[] 
{"TABLE"});
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
-        "ClickHouseTableOperations.generatePurgeTableSql is not implemented 
yet.");
+        "ClickHouse does not support purge table in Gravitino, please use drop 
table");
   }
 
   @Override
@@ -53,4 +330,29 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     throw new UnsupportedOperationException(
         "ClickHouseTableOperations.generateAlterTableSql is not implemented 
yet.");
   }
+
+  private StringBuilder appendColumnDefinition(JdbcColumn column, 
StringBuilder sqlBuilder) {
+    // Add Nullable data type
+    String dataType = typeConverter.fromGravitino(column.dataType());
+    if (column.nullable()) {
+      sqlBuilder.append(" Nullable(%s) ".formatted(dataType));
+    } else {
+      sqlBuilder.append(" %s ".formatted(dataType));
+    }
+
+    // Add DEFAULT value if specified
+    if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) {
+      sqlBuilder.append(
+          " DEFAULT %s "
+              
.formatted(columnDefaultValueConverter.fromGravitino(column.defaultValue())));
+    }
+
+    // Add column comment if specified
+    if (StringUtils.isNotEmpty(column.comment())) {
+      String escapedComment = StringUtils.replace(column.comment(), "'", "''");
+      sqlBuilder.append("COMMENT '%s' ".formatted(escapedComment));
+    }
+
+    return sqlBuilder;
+  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/ClickHouseUtils.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/ClickHouseUtils.java
new file mode 100644
index 0000000000..d5bd968fcd
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/ClickHouseUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse;
+
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+
+public class ClickHouseUtils {
+
+  public static SortOrder[] getSortOrders(String colName) {
+    return new SortOrders.SortImpl[] 
{SortOrders.of(NamedReference.field(colName), null, null)};
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/converter/TestClickHouseTypeConverter.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/converter/TestClickHouseTypeConverter.java
new file mode 100644
index 0000000000..36e2ad51dc
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/converter/TestClickHouseTypeConverter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.converter;
+
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.BOOL;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.DATE;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.DATE32;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.DATETIME;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.DATETIME64;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.DECIMAL;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.FIXEDSTRING;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.FLOAT32;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.FLOAT64;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.INT16;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.INT32;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.INT64;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.INT8;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.STRING;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.UINT16;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.UINT32;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.UINT64;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.UINT8;
+import static 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter.UUID;
+import static 
org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter.TIME;
+
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Test class for {@link ClickHouseTypeConverter} */
+public class TestClickHouseTypeConverter {
+
+  private static final ClickHouseTypeConverter CLICKHOUSE_TYPE_CONVERTER =
+      new ClickHouseTypeConverter();
+  private static final String USER_DEFINED_TYPE = "user-defined";
+
+  @Test
+  public void testToGravitinoType() {
+    checkJdbcTypeToGravitinoType(Types.ByteType.get(), INT8, null, null);
+    checkJdbcTypeToGravitinoType(Types.ByteType.unsigned(), UINT8, null, null);
+    checkJdbcTypeToGravitinoType(Types.ShortType.get(), INT16, null, null);
+    checkJdbcTypeToGravitinoType(Types.ShortType.unsigned(), UINT16, null, 
null);
+    checkJdbcTypeToGravitinoType(Types.IntegerType.get(), INT32, null, null);
+    checkJdbcTypeToGravitinoType(Types.IntegerType.unsigned(), UINT32, null, 
null);
+    checkJdbcTypeToGravitinoType(Types.LongType.get(), INT64, null, null);
+    checkJdbcTypeToGravitinoType(Types.LongType.unsigned(), UINT64, null, 
null);
+    checkJdbcTypeToGravitinoType(Types.FloatType.get(), FLOAT32, null, null);
+    checkJdbcTypeToGravitinoType(Types.DoubleType.get(), FLOAT64, null, null);
+    checkJdbcTypeToGravitinoType(Types.DateType.get(), DATE, null, null);
+    checkJdbcTypeToGravitinoType(Types.ExternalType.of("Date32"), DATE32, 
null, null);
+    checkJdbcTypeToGravitinoType(Types.TimestampType.withoutTimeZone(0), 
DATETIME, null, null);
+    checkJdbcTypeToGravitinoType(Types.DecimalType.of(10, 2), DECIMAL, 10, 2);
+    checkJdbcTypeToGravitinoType(Types.StringType.get(), STRING, 20, null);
+    checkJdbcTypeToGravitinoType(Types.FixedCharType.of(20), FIXEDSTRING, 20, 
null);
+    checkJdbcTypeToGravitinoType(Types.BooleanType.get(), BOOL, 20, null);
+    checkJdbcTypeToGravitinoType(Types.UUIDType.get(), UUID, 20, null);
+    checkJdbcTypeToGravitinoType(
+        Types.ExternalType.of(USER_DEFINED_TYPE), USER_DEFINED_TYPE, null, 
null);
+
+    JdbcTypeConverter.JdbcTypeBean dateTime64 = createTypeBean(DATETIME64, 
null, null);
+    dateTime64.setDatetimePrecision(3);
+    Assertions.assertEquals(
+        Types.ExternalType.of("DateTime64"), 
CLICKHOUSE_TYPE_CONVERTER.toGravitino(dateTime64));
+
+    JdbcTypeConverter.JdbcTypeBean nullableDecimal =
+        createTypeBean("Nullable(" + DECIMAL + ")", 12, 2);
+    nullableDecimal.setColumnSize(12);
+    nullableDecimal.setScale(2);
+    Assertions.assertEquals(
+        Types.DecimalType.of(12, 2), 
CLICKHOUSE_TYPE_CONVERTER.toGravitino(nullableDecimal));
+
+    JdbcTypeConverter.JdbcTypeBean date32 = createTypeBean(DATE32, null, null);
+    Assertions.assertEquals(
+        Types.ExternalType.of(DATE32), 
CLICKHOUSE_TYPE_CONVERTER.toGravitino(date32));
+
+    JdbcTypeConverter.JdbcTypeBean ipv4 = createTypeBean("IPv4", null, null);
+    Assertions.assertEquals(
+        Types.ExternalType.of("IPv4"), 
CLICKHOUSE_TYPE_CONVERTER.toGravitino(ipv4));
+
+    JdbcTypeConverter.JdbcTypeBean decimalTooLarge = createTypeBean("Decimal", 
77, 2);
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> CLICKHOUSE_TYPE_CONVERTER.toGravitino(decimalTooLarge));
+
+    JdbcTypeConverter.JdbcTypeBean decimalScaleTooHigh = 
createTypeBean("Decimal", 10, 20);
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> CLICKHOUSE_TYPE_CONVERTER.toGravitino(decimalScaleTooHigh));
+  }
+
+  @Test
+  public void testFromGravitinoType() {
+    checkGravitinoTypeToJdbcType(INT8, Types.ByteType.get());
+    checkGravitinoTypeToJdbcType(UINT8, Types.ByteType.unsigned());
+    checkGravitinoTypeToJdbcType(INT16, Types.ShortType.get());
+    checkGravitinoTypeToJdbcType(UINT16, Types.ShortType.unsigned());
+    checkGravitinoTypeToJdbcType(INT32, Types.IntegerType.get());
+    checkGravitinoTypeToJdbcType(UINT32, Types.IntegerType.unsigned());
+    checkGravitinoTypeToJdbcType(INT64, Types.LongType.get());
+    checkGravitinoTypeToJdbcType(UINT64, Types.LongType.unsigned());
+    checkGravitinoTypeToJdbcType(FLOAT32, Types.FloatType.get());
+    checkGravitinoTypeToJdbcType(FLOAT64, Types.DoubleType.get());
+    checkGravitinoTypeToJdbcType(DATE, Types.DateType.get());
+    checkGravitinoTypeToJdbcType(DATETIME, 
Types.TimestampType.withoutTimeZone(0));
+    checkGravitinoTypeToJdbcType(DECIMAL + "(10,2)", Types.DecimalType.of(10, 
2));
+    checkGravitinoTypeToJdbcType(STRING, Types.VarCharType.of(20));
+    checkGravitinoTypeToJdbcType(FIXEDSTRING + "(20)", 
Types.FixedCharType.of(20));
+    checkGravitinoTypeToJdbcType(STRING, Types.StringType.get());
+    checkGravitinoTypeToJdbcType(BOOL, Types.BooleanType.get());
+    checkGravitinoTypeToJdbcType(UUID, Types.UUIDType.get());
+    checkGravitinoTypeToJdbcType(USER_DEFINED_TYPE, 
Types.ExternalType.of(USER_DEFINED_TYPE));
+    checkGravitinoTypeToJdbcType("DateTime", 
Types.TimestampType.withoutTimeZone(0));
+    checkGravitinoTypeToJdbcType(TIME, Types.TimeType.get());
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
CLICKHOUSE_TYPE_CONVERTER.fromGravitino(Types.UnparsedType.of(USER_DEFINED_TYPE)));
+  }
+
+  protected void checkGravitinoTypeToJdbcType(String jdbcTypeName, Type 
gravitinoType) {
+    Assertions.assertEquals(jdbcTypeName, 
CLICKHOUSE_TYPE_CONVERTER.fromGravitino(gravitinoType));
+  }
+
+  protected void checkJdbcTypeToGravitinoType(
+      Type gravitinoType, String jdbcTypeName, Integer columnSize, Integer 
scale) {
+    JdbcTypeConverter.JdbcTypeBean typeBean = createTypeBean(jdbcTypeName, 
columnSize, scale);
+    Assertions.assertEquals(gravitinoType, 
CLICKHOUSE_TYPE_CONVERTER.toGravitino(typeBean));
+  }
+
+  protected static JdbcTypeConverter.JdbcTypeBean createTypeBean(
+      String typeName, Integer columnSize, Integer scale) {
+    return new JdbcTypeConverter.JdbcTypeBean(typeName) {
+      {
+        setColumnSize(columnSize);
+        setScale(scale);
+      }
+    };
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouse.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouse.java
new file mode 100644
index 0000000000..082b50b5b9
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import com.google.common.collect.Maps;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import javax.sql.DataSource;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
+import org.apache.gravitino.catalog.jdbc.TestJdbc;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.catalog.jdbc.utils.DataSourceUtils;
+import org.apache.gravitino.integration.test.container.ClickHouseContainer;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestClickHouse extends TestJdbc {
+  protected static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  protected static TestDatabaseName TEST_DB_NAME;
+
+  @BeforeAll
+  public static void startup() throws Exception {
+    ContainerSuite containerSuite = ContainerSuite.getInstance();
+    TEST_DB_NAME = TestDatabaseName.CLICKHOUSE_CLICKHOUSE_ABSTRACT_IT;
+    containerSuite.startClickHouseContainer(TEST_DB_NAME);
+    DataSource dataSource = 
DataSourceUtils.createDataSource(getClickHouseCatalogProperties());
+
+    DATABASE_OPERATIONS = new ClickHouseDatabaseOperations();
+    TABLE_OPERATIONS = new ClickHouseTableOperations();
+    JDBC_EXCEPTION_CONVERTER = new ClickHouseExceptionConverter();
+    DATABASE_OPERATIONS.initialize(dataSource, JDBC_EXCEPTION_CONVERTER, 
Collections.emptyMap());
+    TABLE_OPERATIONS.initialize(
+        dataSource,
+        JDBC_EXCEPTION_CONVERTER,
+        new ClickHouseTypeConverter(),
+        new ClickHouseColumnDefaultValueConverter(),
+        Collections.emptyMap());
+  }
+
+  protected static Map<String, String> getClickHouseCatalogProperties() throws 
SQLException {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    ClickHouseContainer clickHouseContainer = 
containerSuite.getClickHouseContainer();
+
+    String jdbcUrl = clickHouseContainer.getJdbcUrl(TEST_DB_NAME);
+
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl);
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
clickHouseContainer.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
clickHouseContainer.getUsername());
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
clickHouseContainer.getPassword());
+
+    return catalogProperties;
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseCatalogOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseCatalogOperations.java
new file mode 100644
index 0000000000..317f6cc3eb
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseCatalogOperations.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import java.sql.SQLException;
+import org.apache.gravitino.catalog.clickhouse.ClickHouseCatalog;
+import org.apache.gravitino.catalog.jdbc.JdbcCatalogOperations;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseCatalogOperations extends TestClickHouse {
+
+  @Test
+  public void testCheckJDBCDriver() throws SQLException {
+    JdbcCatalogOperations catalogOperations =
+        new JdbcCatalogOperations(null, null, DATABASE_OPERATIONS, 
TABLE_OPERATIONS, null);
+    catalogOperations.initialize(getClickHouseCatalogProperties(), null, new 
ClickHouseCatalog());
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
new file mode 100644
index 0000000000..b5bc0af80f
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.gravitino.catalog.clickhouse.ClickHouseConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.TableConstants;
+import 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata;
+import org.apache.gravitino.catalog.clickhouse.ClickHouseUtils;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestClickHouseTableOperations extends TestClickHouse {
+  private static final Type STRING = Types.StringType.get();
+  private static final Type INT = Types.IntegerType.get();
+
+  @Test
+  public void testCreateAndLoadTable() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_cl_table";
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.DecimalType.of(10, 2))
+            .withComment("test_decimal")
+            .withNullable(false)
+            .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00", 10, 
2)))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .withDefaultValue(Literals.longLiteral(0L))
+            .withComment("long type")
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_3")
+            .withType(Types.TimestampType.withoutTimeZone(0))
+            .withNullable(false)
+            .withComment("timestamp")
+            
.withDefaultValue(Literals.timestampLiteral(LocalDateTime.parse("2013-01-01T00:00:00")))
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(true)
+            .withComment("date")
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .build());
+    Map<String, String> properties = new HashMap<>();
+
+    Index[] indexes = new Index[0];
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("col_1"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, loaded);
+  }
+
+  @Test
+  public void testTypeConversionAgainstCluster() {
+    String tableName = RandomStringUtils.randomAlphabetic(16) + "_type_conv";
+
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_int8")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_uint8")
+            .withType(Types.ByteType.unsigned())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_int16")
+            .withType(Types.ShortType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_uint16")
+            .withType(Types.ShortType.unsigned())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_int32")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_uint32")
+            .withType(Types.IntegerType.unsigned())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_decimal")
+            .withType(Types.DecimalType.of(10, 2))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_uint64")
+            .withType(Types.LongType.unsigned())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_float32")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_float64")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_fixed")
+            .withType(Types.FixedCharType.of(3))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_string")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_varchar")
+            .withType(Types.VarCharType.of(5))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_date")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_ts")
+            .withType(Types.TimestampType.withoutTimeZone(0))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_dt64")
+            .withType(Types.ExternalType.of("DateTime64(3)"))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_bool")
+            .withType(Types.BooleanType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_uuid")
+            .withType(Types.UUIDType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("c_ipv4")
+            .withType(Types.ExternalType.of("IPv4"))
+            .withNullable(false)
+            .build());
+
+    Map<String, String> properties = new HashMap<>();
+    Index[] indexes =
+        new Index[] {
+          Indexes.primary(Indexes.DEFAULT_PRIMARY_KEY_NAME, new String[][] 
{{"c_int8"}})
+        };
+
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        "type conversion",
+        properties,
+        null,
+        Distributions.NONE,
+        indexes,
+        getSortOrders("c_int8"));
+
+    JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
tableName);
+
+    Assertions.assertEquals(Types.ByteType.get(), 
loaded.columns()[0].dataType());
+    Assertions.assertEquals(Types.ByteType.unsigned(), 
loaded.columns()[1].dataType());
+    Assertions.assertEquals(Types.ShortType.get(), 
loaded.columns()[2].dataType());
+    Assertions.assertEquals(Types.ShortType.unsigned(), 
loaded.columns()[3].dataType());
+    Assertions.assertEquals(Types.IntegerType.get(), 
loaded.columns()[4].dataType());
+    Assertions.assertEquals(Types.IntegerType.unsigned(), 
loaded.columns()[5].dataType());
+    Assertions.assertEquals(Types.DecimalType.of(10, 2), 
loaded.columns()[6].dataType());
+    Assertions.assertEquals(Types.LongType.unsigned(), 
loaded.columns()[7].dataType());
+    Assertions.assertEquals(Types.FloatType.get(), 
loaded.columns()[8].dataType());
+    Assertions.assertEquals(Types.DoubleType.get(), 
loaded.columns()[9].dataType());
+    Assertions.assertEquals(Types.FixedCharType.of(3), 
loaded.columns()[10].dataType());
+    Assertions.assertEquals(Types.StringType.get(), 
loaded.columns()[11].dataType());
+    Assertions.assertEquals(Types.StringType.get(), 
loaded.columns()[12].dataType());
+    Assertions.assertEquals(Types.DateType.get(), 
loaded.columns()[13].dataType());
+    Assertions.assertEquals(
+        Types.TimestampType.withoutTimeZone(0), 
loaded.columns()[14].dataType());
+    Assertions.assertEquals(
+        Types.ExternalType.of("DateTime64(3)"), 
loaded.columns()[15].dataType());
+    Assertions.assertEquals(Types.BooleanType.get(), 
loaded.columns()[16].dataType());
+    Assertions.assertEquals(Types.UUIDType.get(), 
loaded.columns()[17].dataType());
+    Assertions.assertEquals(Types.ExternalType.of("IPv4"), 
loaded.columns()[18].dataType());
+
+    Assertions.assertTrue(TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), 
tableName));
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.ByteType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_2")
+            .withType(Types.ShortType.get())
+            .withNullable(true)
+            .build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(INT).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_5")
+            .withType(Types.FloatType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_6")
+            .withType(Types.DoubleType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_7")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_9")
+            .withType(Types.TimestampType.withoutTimeZone(0))
+            .withNullable(false)
+            .build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_10").withType(Types.DecimalType.of(10, 
2)).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(STRING).withNullable(false).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_12")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_13")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_15")
+            .withType(Types.FixedCharType.of(10))
+            .withNullable(false)
+            .build());
+
+    // create table
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        Collections.emptyMap(),
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), tableName);
+    assertionsTableInfo(
+        tableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        load);
+  }
+
+  @Test
+  public void testCreateNotSupportTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("type_table_");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    List<Type> notSupportType =
+        Arrays.asList(
+            Types.FixedType.of(10),
+            Types.IntervalDayType.get(),
+            Types.IntervalYearType.get(),
+            Types.ListType.of(Types.DateType.get(), true),
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+            Types.UnionType.of(Types.IntegerType.get()),
+            Types.StructType.of(
+                Types.StructType.Field.notNullField("col_1", 
Types.IntegerType.get())));
+
+    for (Type type : notSupportType) {
+      columns.clear();
+      columns.add(
+          
JdbcColumn.builder().withName("col_1").withType(type).withNullable(false).build());
+
+      JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]);
+      Map<String, String> emptyMap = Collections.emptyMap();
+      IllegalArgumentException illegalArgumentException =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> {
+                TABLE_OPERATIONS.create(
+                    TEST_DB_NAME.toString(),
+                    tableName,
+                    jdbcCols,
+                    tableComment,
+                    emptyMap,
+                    null,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES,
+                    getSortOrders("col_1"));
+              });
+      Assertions.assertTrue(
+          illegalArgumentException
+              .getMessage()
+              .contains(
+                  String.format(
+                      "Couldn't convert Gravitino type %s to ClickHouse type",
+                      type.simpleString())));
+    }
+  }
+
+  @Test
+  public void testCreateMultipleTables() {
+    String testTable1 = "test_table_1";
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        testTable1,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    String testDb = "test_db_2";
+
+    DATABASE_OPERATIONS.create(testDb, null, null);
+    List<String> tables = TABLE_OPERATIONS.listTables(testDb);
+    Assertions.assertFalse(tables.contains(testTable1));
+
+    String testTable2 = "test_table_2";
+    TABLE_OPERATIONS.create(
+        testDb,
+        testTable2,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.00")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+
+    tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME.toString());
+    Assertions.assertFalse(tables.contains(testTable2));
+  }
+
+  @Test
+  public void testLoadTableDefaultProperties() {
+    String testTable1 = RandomNameUtils.genRandomName("properties_table_");
+    TABLE_OPERATIONS.create(
+        TEST_DB_NAME.toString(),
+        testTable1,
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("col_1")
+              .withType(Types.DecimalType.of(10, 2))
+              .withComment("test_decimal")
+              .withNullable(false)
+              .withDefaultValue(Literals.decimalLiteral(Decimal.of("0.0")))
+              .build()
+        },
+        "test_comment",
+        null,
+        null,
+        Distributions.NONE,
+        Indexes.EMPTY_INDEXES,
+        getSortOrders("col_1"));
+    JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME.toString(), 
testTable1);
+    Assertions.assertEquals("MergeTree", 
load.properties().get(CLICKHOUSE_ENGINE_KEY));
+  }
+
+  @Test
+  public void testGenerateCreateTableSqlBranchCoverage() {
+    TestableClickHouseTableOperations ops = new 
TestableClickHouseTableOperations();
+    ops.initialize(
+        null,
+        new ClickHouseExceptionConverter(),
+        new ClickHouseTypeConverter(),
+        new ClickHouseColumnDefaultValueConverter(),
+        new HashMap<>());
+
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("c1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Index[] indexes =
+        new Index[] {
+          Indexes.of(
+              Index.IndexType.PRIMARY_KEY,
+              Indexes.DEFAULT_PRIMARY_KEY_NAME,
+              new String[][] {{"c1"}})
+        };
+
+    // partitioning not supported
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            ops.buildCreateSql(
+                "t1",
+                new JdbcColumn[] {col},
+                null,
+                new HashMap<>(),
+                new Transform[] {Transforms.identity("p")},
+                Distributions.NONE,
+                indexes,
+                ClickHouseUtils.getSortOrders("c1")));
+
+    // distribution not NONE
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.buildCreateSql(
+                "t1",
+                new JdbcColumn[] {col},
+                null,
+                new HashMap<>(),
+                new Transform[0],
+                Distributions.hash(4, NamedReference.field("c1")),
+                indexes,
+                ClickHouseUtils.getSortOrders("c1")));
+
+    // MergeTree requires ORDER BY
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.buildCreateSql(
+                "t1",
+                new JdbcColumn[] {col},
+                null,
+                new HashMap<>(),
+                new Transform[0],
+                Distributions.NONE,
+                indexes,
+                new SortOrder[0]));
+
+    // MergeTree only supports single sortOrders
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            ops.buildCreateSql(
+                "t1",
+                new JdbcColumn[] {col},
+                null,
+                new HashMap<>(),
+                new Transform[0],
+                Distributions.NONE,
+                indexes,
+                new SortOrder[] {
+                  ClickHouseUtils.getSortOrders("c1")[0], 
ClickHouseUtils.getSortOrders("c1")[0]
+                }));
+
+    // Engine without ORDER BY support should fail when sortOrders provided
+    Map<String, String> logEngineProps = new HashMap<>();
+    logEngineProps.put(
+        TableConstants.ENGINE_UPPER, 
ClickHouseTablePropertiesMetadata.ENGINE.LOG.getValue());
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            ops.buildCreateSql(
+                "t1",
+                new JdbcColumn[] {col},
+                null,
+                logEngineProps,
+                new Transform[0],
+                Distributions.NONE,
+                indexes,
+                ClickHouseUtils.getSortOrders("c1")));
+
+    // Settings and comment retained
+    Map<String, String> props = new HashMap<>();
+    props.put(
+        TableConstants.ENGINE_UPPER, 
ClickHouseTablePropertiesMetadata.ENGINE.MERGETREE.getValue());
+    props.put(ClickHouseConstants.TableConstants.SETTINGS_PREFIX + 
"max_threads", "8");
+    String sql =
+        ops.buildCreateSql(
+            "t1",
+            new JdbcColumn[] {col},
+            "co'mment",
+            props,
+            new Transform[0],
+            Distributions.NONE,
+            indexes,
+            ClickHouseUtils.getSortOrders("c1"));
+    Assertions.assertTrue(sql.contains("ENGINE = MergeTree"));
+    Assertions.assertTrue(sql.contains("SETTINGS max_threads = 8"));
+    Assertions.assertTrue(sql.contains("COMMENT 'co''mment'"));
+  }
+
+  private static final class TestableClickHouseTableOperations extends 
ClickHouseTableOperations {
+    String buildCreateSql(
+        String tableName,
+        JdbcColumn[] columns,
+        String comment,
+        Map<String, String> properties,
+        Transform[] partitioning,
+        org.apache.gravitino.rel.expressions.distributions.Distribution 
distribution,
+        Index[] indexes,
+        SortOrder[] sortOrders) {
+      return generateCreateTableSql(
+          tableName, columns, comment, properties, partitioning, distribution, 
indexes, sortOrders);
+    }
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
index ff3fef5e45..8c61a03069 100644
--- 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/main/java/org/apache/gravitino/catalog/oceanbase/operation/OceanBaseTableOperations.java
@@ -133,8 +133,7 @@ public class OceanBaseTableOperations extends 
JdbcTableOperations {
       switch (index.type()) {
         case PRIMARY_KEY:
           if (null != index.name()
-              && !StringUtils.equalsIgnoreCase(
-                  index.name(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+              && !StringUtils.equalsIgnoreCase(index.name(), 
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
             throw new IllegalArgumentException("Primary key name must be 
PRIMARY in OceanBase");
           }
           sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY 
(").append(fieldStr).append(")");
@@ -396,7 +395,7 @@ public class OceanBaseTableOperations extends 
JdbcTableOperations {
       case PRIMARY_KEY:
         if (null != addIndex.getName()
             && !StringUtils.equalsIgnoreCase(
-                addIndex.getName(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+                addIndex.getName(), Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
           throw new IllegalArgumentException("Primary key name must be PRIMARY 
in OceanBase");
         }
         sqlBuilder.append("PRIMARY KEY ");
diff --git 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/integration/test/CatalogOceanBaseIT.java
 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/integration/test/CatalogOceanBaseIT.java
index 60ef1c1a37..260d851ac7 100644
--- 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/integration/test/CatalogOceanBaseIT.java
+++ 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/integration/test/CatalogOceanBaseIT.java
@@ -1622,7 +1622,7 @@ public class CatalogOceanBaseIT extends BaseIT {
         NameIdentifier.of(schemaName, tableName),
         TableChange.addIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_1"}}));
 
     Table table = tableCatalog.loadTable(NameIdentifier.of(schemaName, 
tableName));
@@ -1728,7 +1728,7 @@ public class CatalogOceanBaseIT extends BaseIT {
             true),
         TableChange.addIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_6"}}));
 
     Table table = tableCatalog.loadTable(tableIdentifier);
diff --git 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
index cedcabe038..c26d25b608 100644
--- 
a/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-oceanbase/src/test/java/org/apache/gravitino/catalog/oceanbase/operation/TestOceanBaseTableOperations.java
@@ -985,7 +985,7 @@ public class TestOceanBaseTableOperations extends 
TestOceanBase {
     successIndex =
         new TableChange.AddIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_1"}, {"col_2"}});
     sql = OceanBaseTableOperations.addIndexDefinition(successIndex);
     Assertions.assertEquals("ADD PRIMARY KEY  (`col_1`, `col_2`)", sql);
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/converter/JdbcColumnDefaultValueConverter.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/converter/JdbcColumnDefaultValueConverter.java
index 0998c9f2d6..f2fa7d85b0 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/converter/JdbcColumnDefaultValueConverter.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/converter/JdbcColumnDefaultValueConverter.java
@@ -22,6 +22,8 @@ import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.FunctionExpression;
@@ -35,11 +37,18 @@ public class JdbcColumnDefaultValueConverter {
   protected static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
   protected static final String NULL = "NULL";
   protected static final DateTimeFormatter DATE_TIME_FORMATTER =
-      DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]");
+      new DateTimeFormatterBuilder()
+          .appendPattern("yyyy-MM-dd HH:mm:ss")
+          .appendFraction(ChronoField.NANO_OF_SECOND, 0, 6, true)
+          .toFormatter();
   protected static final DateTimeFormatter DATE_FORMATTER =
       DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
   protected static final DateTimeFormatter TIME_FORMATTER =
-      
DateTimeFormatter.ofPattern("HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]");
+      new DateTimeFormatterBuilder()
+          .appendPattern("HH:mm:ss")
+          .appendFraction(ChronoField.NANO_OF_SECOND, 0, 6, true)
+          .toFormatter();
 
   public String fromGravitino(Expression defaultValue) {
     if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index 2cce434a05..1877ecf519 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
@@ -116,12 +117,43 @@ public abstract class JdbcTableOperations implements 
TableOperation {
       Distribution distribution,
       Index[] indexes)
       throws TableAlreadyExistsException {
+    create(
+        databaseName,
+        tableName,
+        columns,
+        comment,
+        properties,
+        partitioning,
+        distribution,
+        indexes,
+        null);
+  }
+
+  @Override
+  public void create(
+      String databaseName,
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders)
+      throws TableAlreadyExistsException {
     LOG.info("Attempting to create table {} in database {}", tableName, 
databaseName);
     try (Connection connection = getConnection(databaseName)) {
       JdbcConnectorUtils.executeUpdate(
           connection,
           generateCreateTableSql(
-              tableName, columns, comment, properties, partitioning, 
distribution, indexes));
+              tableName,
+              columns,
+              comment,
+              properties,
+              partitioning,
+              distribution,
+              indexes,
+              sortOrders));
       LOG.info("Created table {} in database {}", tableName, databaseName);
     } catch (final SQLException se) {
       throw this.exceptionMapper.toGravitinoException(se);
@@ -483,6 +515,24 @@ public abstract class JdbcTableOperations implements 
TableOperation {
       Distribution distribution,
       Index[] indexes);
 
+  protected String generateCreateTableSql(
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders) {
+    if (sortOrders == null || sortOrders.length == 0) {
+      return generateCreateTableSql(
+          tableName, columns, comment, properties, partitioning, distribution, 
indexes);
+    }
+
+    throw new UnsupportedOperationException(
+        "generateCreateTableSql with sortOrders defined is not supported");
+  }
+
   /**
    * The default implementation of this method is based on MySQL syntax, and 
if the catalog does not
    * support MySQL syntax, this method needs to be rewritten.
@@ -768,4 +818,8 @@ public abstract class JdbcTableOperations implements 
TableOperation {
 
     return false;
   }
+
+  protected String quoteIdentifier(String identifier) {
+    return "`" + identifier + "`";
+  }
 }
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
index ff5bd6048d..479a9baab5 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
@@ -32,6 +32,7 @@ import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 
@@ -74,6 +75,32 @@ public interface TableOperation {
       Index[] indexes)
       throws TableAlreadyExistsException;
 
+  /**
+   * Create a table with sort orders.
+   *
+   * @param databaseName database name of the table
+   * @param tableName name of the table
+   * @param columns columns of the table
+   * @param comment comment of the table
+   * @param properties properties of the table
+   * @param partitioning partitioning of the table
+   * @param distribution distribution information of the table
+   * @param indexes indexes of the table
+   * @param sortOrders sort orders of the table
+   * @throws TableAlreadyExistsException if the table already exists
+   */
+  void create(
+      String databaseName,
+      String tableName,
+      JdbcColumn[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitioning,
+      Distribution distribution,
+      Index[] indexes,
+      SortOrder[] sortOrders)
+      throws TableAlreadyExistsException;
+
   /**
    * @param databaseName The name of the database.
    * @param tableName The name of the table.
diff --git 
a/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
 
b/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
index e14fd38076..0a597ef214 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/main/java/org/apache/gravitino/catalog/mysql/operation/MysqlTableOperations.java
@@ -138,8 +138,7 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
       switch (index.type()) {
         case PRIMARY_KEY:
           if (null != index.name()
-              && !StringUtils.equalsIgnoreCase(
-                  index.name(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+              && !StringUtils.equalsIgnoreCase(index.name(), 
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
             throw new IllegalArgumentException("Primary key name must be 
PRIMARY in MySQL");
           }
           sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY 
(").append(fieldStr).append(")");
@@ -381,7 +380,7 @@ public class MysqlTableOperations extends 
JdbcTableOperations {
       case PRIMARY_KEY:
         if (null != addIndex.getName()
             && !StringUtils.equalsIgnoreCase(
-                addIndex.getName(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
+                addIndex.getName(), Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
           throw new IllegalArgumentException("Primary key name must be PRIMARY 
in MySQL");
         }
         sqlBuilder.append("PRIMARY KEY ");
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 1e47f72496..17ad8cb3bd 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -1934,7 +1934,7 @@ public class CatalogMysqlIT extends BaseIT {
             Index.IndexType.UNIQUE_KEY, "u1_key", new String[][] {{"col_2"}, 
{"col_3"}}),
         TableChange.addIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_1"}}));
 
     Table table = tableCatalog.loadTable(NameIdentifier.of(schemaName, 
tableName));
@@ -2059,7 +2059,7 @@ public class CatalogMysqlIT extends BaseIT {
             true),
         TableChange.addIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_6"}}));
 
     Table table = tableCatalog.loadTable(tableIdentifier);
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
index 1146be6b72..48ff38e7fc 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/operation/TestMysqlTableOperations.java
@@ -1046,7 +1046,7 @@ public class TestMysqlTableOperations extends TestMysql {
     successIndex =
         new TableChange.AddIndex(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col_1"}, {"col_2"}});
     sql = MysqlTableOperations.addIndexDefinition(successIndex);
     Assertions.assertEquals("ADD PRIMARY KEY  (`col_1`, `col_2`)", sql);
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/requests/TestTableUpdatesRequest.java
 
b/common/src/test/java/org/apache/gravitino/dto/requests/TestTableUpdatesRequest.java
index 9276383066..11804ae7e5 100644
--- 
a/common/src/test/java/org/apache/gravitino/dto/requests/TestTableUpdatesRequest.java
+++ 
b/common/src/test/java/org/apache/gravitino/dto/requests/TestTableUpdatesRequest.java
@@ -284,7 +284,7 @@ public class TestTableUpdatesRequest {
     TableUpdateRequest tableUpdateRequest =
         new TableUpdateRequest.AddTableIndexRequest(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"column1"}});
     String jsonString = 
JsonUtils.objectMapper().writeValueAsString(tableUpdateRequest);
     String expected =
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 0ac9055b79..3e636015ca 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -79,6 +79,8 @@ okhttp3 = "4.11.0"
 opencsv = "2.3"
 metrics = "4.2.25"
 prometheus = "0.16.0"
+clickhouse = "0.7.1"
+lz4 = "1.8.0"
 mysql = "8.0.33"
 postgresql = "42.6.0"
 immutables-value = "2.11.7"
@@ -250,6 +252,7 @@ trino-client= { group = "io.trino", name = "trino-client", 
version.ref = "trino"
 sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = 
"sqlite-jdbc" }
 commons-dbcp2 = { group = "org.apache.commons", name = "commons-dbcp2", 
version.ref = "commons-dbcp2" }
 testcontainers = { group = "org.testcontainers", name = "testcontainers", 
version.ref = "testcontainers" }
+testcontainers-clickhouse = { group = "org.testcontainers", name = 
"clickhouse", version.ref = "testcontainers" }
 testcontainers-mysql = { group = "org.testcontainers", name = "mysql", 
version.ref = "testcontainers" }
 testcontainers-postgresql = { group = "org.testcontainers", name = 
"postgresql", version.ref = "testcontainers" }
 testcontainers-junit-jupiter = { group = "org.testcontainers", name = 
"junit-jupiter", version.ref = "testcontainers" }
@@ -271,6 +274,8 @@ metrics-servlets = { group = "io.dropwizard.metrics", name 
= "metrics-servlets",
 prometheus-client = { group = "io.prometheus", name = "simpleclient", 
version.ref = "prometheus" }
 prometheus-dropwizard = { group = "io.prometheus", name = 
"simpleclient_dropwizard", version.ref = "prometheus" }
 prometheus-servlet = { group = "io.prometheus", name = "simpleclient_servlet", 
version.ref = "prometheus" }
+clickhouse-driver = { group = "com.clickhouse", name = "clickhouse-jdbc", 
version.ref = "clickhouse" }
+lz4-java = { group = "org.lz4", name = "lz4-java", version.ref = "lz4" }
 mysql-driver = { group = "mysql", name = "mysql-connector-java", version.ref = 
"mysql" }
 postgresql-driver = { group = "org.postgresql", name = "postgresql", 
version.ref = "postgresql" }
 minikdc = { group = "org.apache.hadoop", name = "hadoop-minikdc", version.ref 
= "hadoop-minikdc"}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java
new file mode 100644
index 0000000000..c41425f0fc
--- /dev/null
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.awaitility.Awaitility;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+  public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+  public static final String HOST_NAME = "gravitino-ci-clickhouse";
+  public static final int CLICKHOUSE_PORT = 8123;
+  public static final String USER_NAME = "default";
+  public static final String PASSWORD = "default";
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  protected ClickHouseContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  @Override
+  protected void setupContainer() {
+    super.setupContainer();
+    withLogConsumer(new PrintingContainerLog(format("%-14s| ", 
"clickHouseContainer")));
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    Preconditions.check("clickHouse container startup failed!", 
checkContainerStatus(5));
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    Awaitility.await()
+        .atMost(java.time.Duration.ofMinutes(2))
+        .pollInterval(Duration.ofSeconds(5))
+        .until(
+            () -> {
+              try (Connection connection =
+                      DriverManager.getConnection(getJdbcUrl(), USER_NAME, 
getPassword());
+                  Statement statement = connection.createStatement()) {
+                // Simple health check query; ClickHouse should respond if it 
is ready.
+                statement.execute("SELECT 1");
+                LOG.info("clickHouse container is healthy");
+                return true;
+              } catch (SQLException e) {
+                LOG.warn(
+                    "Failed to connect to clickHouse container during 
Awaitility health check: {}",
+                    e.getMessage(),
+                    e);
+                return false;
+              }
+            });
+
+    return true;
+  }
+
+  public void createDatabase(TestDatabaseName testDatabaseName) {
+    String clickHouseJdbcUrl =
+        StringUtils.substring(
+            getJdbcUrl(testDatabaseName), 0, 
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+    // Use the default username and password to connect and create the 
database;
+    // any non-default password must be configured via Gravitino catalog 
properties.
+    try (Connection connection =
+            DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME, 
getPassword());
+        Statement statement = connection.createStatement()) {
+
+      String query = String.format("CREATE DATABASE IF NOT EXISTS %s;", 
testDatabaseName);
+      // FIXME: String, which is used in SQL, can be unsafe
+      statement.execute(query);
+      LOG.info(
+          String.format("clickHouse container database %s has been created", 
testDatabaseName));
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public String getUsername() {
+    return USER_NAME;
+  }
+
+  public String getPassword() {
+    return PASSWORD;
+  }
+
+  public String getJdbcUrl() {
+    return format("jdbc:clickhouse://%s:%d", getContainerIpAddress(), 
CLICKHOUSE_PORT);
+  }
+
+  public String getJdbcUrl(TestDatabaseName testDatabaseName) {
+    return format(
+        "jdbc:clickhouse://%s:%d/%s", getContainerIpAddress(), 
CLICKHOUSE_PORT, testDatabaseName);
+  }
+
+  public String getDriverClassName(TestDatabaseName testDatabaseName) throws 
SQLException {
+    return 
DriverManager.getDriver(getJdbcUrl(testDatabaseName)).getClass().getName();
+  }
+
+  public static class Builder extends BaseContainer.Builder<Builder, 
ClickHouseContainer> {
+
+    private Builder() {
+      this.image = DEFAULT_IMAGE;
+      this.hostName = HOST_NAME;
+      this.exposePorts = ImmutableSet.of(CLICKHOUSE_PORT);
+    }
+
+    @Override
+    public ClickHouseContainer build() {
+      return new ClickHouseContainer(
+          image, hostName, exposePorts, extraHosts, filesToMount, envVars, 
network);
+    }
+  }
+}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index 51d0030c10..4a194a387a 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -72,6 +72,7 @@ public class ContainerSuite implements Closeable {
   private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap 
=
       new EnumMap<>(PGImageName.class);
   private static volatile OceanBaseContainer oceanBaseContainer;
+  private static volatile ClickHouseContainer clickHouseContainer;
 
   private static volatile GravitinoLocalStackContainer 
gravitinoLocalStackContainer;
 
@@ -506,6 +507,37 @@ public class ContainerSuite implements Closeable {
     }
   }
 
+  public void startClickHouseContainer(TestDatabaseName testDatabaseName) {
+    if (clickHouseContainer == null) {
+      synchronized (ContainerSuite.class) {
+        if (clickHouseContainer == null) {
+          initIfNecessary();
+          // Start ClickHouse container
+          ClickHouseContainer.Builder clickHouseBuilder =
+              ClickHouseContainer.builder()
+                  .withHostName("gravitino-ci-clickhouse")
+                  .withEnvVars(
+                      ImmutableMap.<String, String>builder()
+                          .put("CLICKHOUSE_PASSWORD", 
ClickHouseContainer.PASSWORD)
+                          .build())
+                  
.withExposePorts(ImmutableSet.of(ClickHouseContainer.CLICKHOUSE_PORT))
+                  .withNetwork(network);
+
+          ClickHouseContainer container = 
closer.register(clickHouseBuilder.build());
+          container.start();
+          clickHouseContainer = container;
+        }
+      }
+    }
+    synchronized (ClickHouseContainer.class) {
+      clickHouseContainer.createDatabase(testDatabaseName);
+    }
+  }
+
+  public ClickHouseContainer getClickHouseContainer() {
+    return clickHouseContainer;
+  }
+
   public GravitinoLocalStackContainer getLocalStackContainer() {
     return gravitinoLocalStackContainer;
   }
@@ -652,8 +684,7 @@ public class ContainerSuite implements Closeable {
       }
     }
 
-    com.github.dockerjava.api.model.Network.Ipam.Config ipamConfig =
-        new com.github.dockerjava.api.model.Network.Ipam.Config();
+    Config ipamConfig = new Config();
     ipamConfig
         .withSubnet(CONTAINER_NETWORK_SUBNET)
         .withGateway(CONTAINER_NETWORK_GATEWAY)
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
index 515e79c007..dd5ac8ea61 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
@@ -116,6 +116,10 @@ public enum TestDatabaseName {
   },
   PG_ICEBERG_AUTHZ_IT,
 
+  CLICKHOUSE_CLICKHOUSE_ABSTRACT_IT,
+  CLICKHOUSE_CATALOG_CLICKHOUSE_IT,
+  CLICKHOUSE_AUDIT_CATALOG_CLICKHOUSE_IT,
+
   /** Represents the PostgreSQL database for partition statistics integration 
tests. */
   PG_TEST_PARTITION_STATS {
     /** PostgreSQL only accept lowercase database name */
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
index 33d4ebcbb8..a9143254f1 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
@@ -718,7 +718,7 @@ public class TestTableOperations extends BaseOperationsTest 
{
     TableUpdateRequest.AddTableIndexRequest req =
         new TableUpdateRequest.AddTableIndexRequest(
             Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
+            Indexes.DEFAULT_PRIMARY_KEY_NAME,
             new String[][] {{"col1"}});
     Column[] columns =
         new Column[] {

Reply via email to