This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new aefb2740fb [#3302][Sub-Task] StarRocks catalog E2E test (#7792)
aefb2740fb is described below

commit aefb2740fb3a6a2d30d39e119b88374b35f9e534
Author: Jarvis <jar...@apache.org>
AuthorDate: Thu Jul 31 16:02:06 2025 +0800

    [#3302][Sub-Task] StarRocks catalog E2E test (#7792)
    
    ### What changes were proposed in this pull request?
    
    add StarRocks Catalog Implement
    
    ### Why are the changes needed?
    
    To support StarRocks Catalog.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    By E2E test
---
 .../gravitino/catalog/jdbc/config/JdbcConfig.java  |   3 +-
 .../integration/test/CatalogStarRocksIT.java       | 928 +++++++++++++++++++++
 .../catalog/starrocks/operation/TestStarRocks.java |  85 ++
 .../operation/TestStarRocksDatabaseOperations.java |  52 ++
 .../operation/TestStarRocksTableOperations.java    | 583 +++++++++++++
 .../TestStarRocksTablePartitionOperations.java     | 361 ++++++++
 .../src/main/java/org/apache/gravitino/Config.java |   2 +-
 .../apache/gravitino/config/ConfigConstants.java   |   5 +-
 docs/index.md                                      |   1 +
 docs/jdbc-starrocks-catalog.md                     | 197 +++++
 .../integration/test/container/ContainerSuite.java |  27 +-
 .../test/container/StarRocksContainer.java         | 135 +++
 12 files changed, 2375 insertions(+), 4 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java
index dc29343af9..28c6aa9fb4 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.catalog.jdbc.config;
 
 import java.util.Map;
+import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.config.ConfigBuilder;
@@ -65,7 +66,7 @@ public class JdbcConfig extends Config {
           .doc("The password of the Jdbc connection")
           .version(ConfigConstants.VERSION_0_3_0)
           .stringConf()
-          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .checkValue(Objects::nonNull, ConfigConstants.NOT_NULL_ERROR_MSG)
           .create();
 
   public static final ConfigEntry<Integer> POOL_MIN_SIZE =
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
new file mode 100644
index 0000000000..5437f363fe
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java
@@ -0,0 +1,928 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.integration.test;
+
+import static 
org.apache.gravitino.integration.test.util.ITUtils.assertPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.StarRocksContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SupportsPartitions;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+@Tag("gravitino-docker-test")
+public class CatalogStarRocksIT extends BaseIT {
+
+  private static final String provider = "jdbc-starrocks";
+
+  private static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
+
+  public String metalakeName = 
GravitinoITUtils.genRandomName("starrocks_it_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("starrocks_it_catalog");
+  public String schemaName = 
GravitinoITUtils.genRandomName("starrocks_it_schema");
+  public String tableName = 
GravitinoITUtils.genRandomName("starrocks_it_table");
+
+  public String table_comment = "table_comment_by_gravitino_it";
+
+  // StarRocks doesn't support schema comment
+  public String schema_comment = null;
+  public String STARROCKS_COL_NAME1 = "starrocks_col_name1";
+  public String STARROCKS_COL_NAME2 = "starrocks_col_name2";
+  public String STARROCKS_COL_NAME3 = "starrocks_col_name3";
+  public String STARROCKS_COL_NAME4 = "starrocks_col_name4";
+
+  // Because the creation of Schema Change is an asynchronous process, we need 
to wait for a while
+  // For more information, you can refer to the comment in
+  // StarRocksTableOperations.generateAlterTableSql().
+  private static final long MAX_WAIT_IN_SECONDS = 30;
+
+  private static final long WAIT_INTERVAL_IN_SECONDS = 1;
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private GravitinoMetalake metalake;
+  private String jdbcUrl;
+
+  protected Catalog catalog;
+
+  @BeforeAll
+  public void startup() throws IOException {
+    containerSuite.startStarRocksContainer();
+
+    createMetalake();
+    createCatalog();
+    createSchema();
+  }
+
+  @AfterAll
+  public void stop() {
+    clearTableAndSchema();
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+  }
+
+  @AfterEach
+  public void resetSchema() {
+    clearTableAndSchema();
+    createSchema();
+  }
+
+  private void clearTableAndSchema() {
+    catalog.asSchemas().dropSchema(schemaName, true);
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetaLakes = client.listMetalakes();
+    assertEquals(0, gravitinoMetaLakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  private void createCatalog() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    StarRocksContainer starRocksContainer = 
containerSuite.getStarRocksContainer();
+
+    jdbcUrl =
+        String.format(
+            "jdbc:mysql://%s:%d/",
+            starRocksContainer.getContainerIpAddress(), 
StarRocksContainer.FE_MYSQL_PORT);
+
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl);
+    catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME);
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
StarRocksContainer.USER_NAME);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), "");
+
+    Catalog createdCatalog =
+        metalake.createCatalog(
+            catalogName,
+            Catalog.Type.RELATIONAL,
+            provider,
+            "starrocks catalog comment",
+            catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(catalogName);
+    assertEquals(createdCatalog, loadCatalog);
+
+    catalog = loadCatalog;
+  }
+
+  private void createSchema() {
+    NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, 
schemaName);
+    Map<String, String> prop = Maps.newHashMap();
+
+    Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), 
schema_comment, prop);
+    Schema loadSchema = catalog.asSchemas().loadSchema(ident.name());
+    assertEquals(createdSchema.name(), loadSchema.name());
+  }
+
+  private Column[] createColumns() {
+    Column col1 =
+        Column.of(
+            STARROCKS_COL_NAME1, Types.IntegerType.get(), "col_1_comment", 
false, false, null);
+    Column col2 = Column.of(STARROCKS_COL_NAME2, Types.VarCharType.of(10), 
"col_2_comment");
+    Column col3 = Column.of(STARROCKS_COL_NAME3, Types.VarCharType.of(10), 
"col_3_comment");
+    Column col4 =
+        Column.of(STARROCKS_COL_NAME4, Types.DateType.get(), "col_4_comment", 
false, false, null);
+    return new Column[] {col1, col2, col3, col4};
+  }
+
+  private Map<String, String> createTableProperties() {
+    return ImmutableMap.of();
+  }
+
+  private Distribution createDistribution() {
+    return Distributions.hash(2, NamedReference.field(STARROCKS_COL_NAME1));
+  }
+
+  @Test
+  void testStarRocksSchemaBasicOperation() {
+    SupportsSchemas schemas = catalog.asSchemas();
+
+    // test list schemas
+    String[] schemaNames = schemas.listSchemas();
+    assertTrue(Arrays.asList(schemaNames).contains(schemaName));
+
+    // test create schema already exists
+    String testSchemaName = 
GravitinoITUtils.genRandomName("create_schema_test");
+    NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, 
testSchemaName);
+    schemas.createSchema(schemaIdent.name(), schema_comment, 
Collections.emptyMap());
+
+    List<String> schemaNameList = Arrays.asList(schemas.listSchemas());
+    assertTrue(schemaNameList.contains(testSchemaName));
+
+    assertThrows(
+        SchemaAlreadyExistsException.class,
+        () -> schemas.createSchema(schemaIdent.name(), schema_comment, 
Collections.emptyMap()));
+
+    // test drop schema
+    assertTrue(schemas.dropSchema(schemaIdent.name(), false));
+
+    // check schema is deleted
+    // 1. check by load schema
+    assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(schemaIdent.name()));
+
+    // 2. check by list schema
+    schemaNameList = Arrays.asList(schemas.listSchemas());
+    assertFalse(schemaNameList.contains(testSchemaName));
+
+    // test drop schema not exists
+    NameIdentifier notExistsSchemaIdent = NameIdentifier.of(metalakeName, 
catalogName, "no-exits");
+    assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(), false));
+  }
+
+  @Test
+  void testDropStarRocksSchema() {
+    String schemaName = 
GravitinoITUtils.genRandomName("starrocks_it_schema_dropped").toLowerCase();
+
+    catalog.asSchemas().createSchema(schemaName, "", ImmutableMap.of());
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName),
+            createColumns(),
+            "Created by Gravitino client",
+            createTableProperties(),
+            Transforms.EMPTY_TRANSFORM,
+            createDistribution(),
+            null);
+
+    // Try to drop a database, and cascade equals to false, it should not be 
allowed.
+    Throwable excep =
+        assertThrows(
+            RuntimeException.class, () -> 
catalog.asSchemas().dropSchema(schemaName, false));
+    assertTrue(excep.getMessage().contains("the value of cascade should be 
true."));
+
+    // Check the database still exists
+    catalog.asSchemas().loadSchema(schemaName);
+
+    // Try to drop a database, and cascade equals to true, it should be 
allowed.
+    assertTrue(catalog.asSchemas().dropSchema(schemaName, true));
+
+    // Check database has been dropped
+    SupportsSchemas schemas = catalog.asSchemas();
+    assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(schemaName));
+  }
+
+  @Test
+  void testSchemaWithIllegalName() {
+    SupportsSchemas schemas = catalog.asSchemas();
+    String databaseName = RandomNameUtils.genRandomName("it_db");
+    Map<String, String> properties = new HashMap<>();
+    String comment = "comment";
+
+    // should throw an exception with string that might contain SQL injection
+    String sqlInjection = databaseName + "`; DROP TABLE important_table; -- ";
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> schemas.createSchema(sqlInjection, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection, false));
+
+    String sqlInjection1 = databaseName + "`; SLEEP(10); -- ";
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> schemas.createSchema(sqlInjection1, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection1, false));
+
+    String sqlInjection2 =
+        databaseName + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> schemas.createSchema(sqlInjection2, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection2, false));
+
+    // should throw an exception with input that has more than 64 characters
+    String invalidInput = StringUtils.repeat("a", 65);
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> schemas.createSchema(invalidInput, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(invalidInput, false));
+  }
+
+  @Test
+  void testStarRocksTableBasicOperation() {
+    // create a table
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+
+    Distribution distribution = createDistribution();
+
+    Map<String, String> properties = createTableProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        null,
+        null);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        loadTable);
+
+    // rename table
+    String newTableName = GravitinoITUtils.genRandomName("new_table_name");
+    tableCatalog.alterTable(tableIdentifier, TableChange.rename(newTableName));
+    NameIdentifier newTableIdentifier = NameIdentifier.of(schemaName, 
newTableName);
+    Table renamedTable = tableCatalog.loadTable(newTableIdentifier);
+    ITUtils.assertionsTableInfo(
+        newTableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        renamedTable);
+  }
+
+  @Test
+  void testStarRocksIllegalTableName() {
+    Map<String, String> properties = createTableProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    String table_name = "t123";
+
+    String t1_name = table_name + "`; DROP TABLE important_table; -- ";
+    Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns = {t1_col};
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t1_name);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier,
+                columns,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                null));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier));
+
+    String t2_name = table_name + "`; SLEEP(10); -- ";
+    Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns2 = new Column[] {t2_col};
+    NameIdentifier tableIdentifier2 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t2_name);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier2,
+                columns2,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                null));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier2));
+
+    String t3_name =
+        table_name + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
+    Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns3 = new Column[] {t3_col};
+    NameIdentifier tableIdentifier3 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t3_name);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier3,
+                columns3,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                null));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier3));
+
+    String invalidInput = StringUtils.repeat("a", 65);
+    Column t4_col = Column.of(invalidInput, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns4 = new Column[] {t4_col};
+    NameIdentifier tableIdentifier4 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, invalidInput);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier4,
+                columns4,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                null));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier4));
+  }
+
+  @Test
+  void testTestConnection() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl);
+    catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME);
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
StarRocksContainer.USER_NAME);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), "wrong_password");
+
+    Exception exception =
+        assertThrows(
+            ConnectionFailedException.class,
+            () ->
+                metalake.testConnection(
+                    GravitinoITUtils.genRandomName("starrocks_it_catalog"),
+                    Catalog.Type.RELATIONAL,
+                    provider,
+                    "starrocks catalog comment",
+                    catalogProperties));
+    Assertions.assertTrue(exception.getMessage().contains("Access denied for 
user"));
+  }
+
+  @Test
+  void testAlterStarRocksTable() {
+    // create a table
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+
+    Distribution distribution = createDistribution();
+
+    Map<String, String> properties = createTableProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        null,
+        null);
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        loadedTable);
+
+    // Alter column type
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.updateColumnType(
+            new String[] {STARROCKS_COL_NAME3}, Types.VarCharType.of(255)));
+
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () ->
+                ITUtils.assertColumn(
+                    Column.of(STARROCKS_COL_NAME3, Types.VarCharType.of(255), 
"col_3_comment"),
+                    tableCatalog.loadTable(tableIdentifier).columns()[2]));
+
+    // add new column
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.addColumn(
+            new String[] {"col_5"}, Types.VarCharType.of(255), 
"col_5_comment", true));
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () -> assertEquals(5, 
tableCatalog.loadTable(tableIdentifier).columns().length));
+
+    ITUtils.assertColumn(
+        Column.of("col_5", Types.VarCharType.of(255), "col_5_comment"),
+        tableCatalog.loadTable(tableIdentifier).columns()[4]);
+
+    // change column position
+    // TODO: change column position is unstable, add it later
+
+    // drop column
+    tableCatalog.alterTable(
+        tableIdentifier, TableChange.deleteColumn(new String[] {"col_5"}, 
true));
+
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () -> assertEquals(4, 
tableCatalog.loadTable(tableIdentifier).columns().length));
+  }
+
+  @Test
+  void testStarRocksTablePartitionOperation() {
+    // create a partitioned table
+    String tableName = 
GravitinoITUtils.genRandomName("test_partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Distribution distribution = createDistribution();
+    Map<String, String> properties = createTableProperties();
+    Transform[] partitioning = {Transforms.list(new String[][] 
{{STARROCKS_COL_NAME1}})};
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        null,
+        null);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        partitioning,
+        loadTable);
+
+    // get table partition operations
+    SupportsPartitions tablePartitionOperations = 
loadTable.supportPartitions();
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrectType =
+        Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    assertThrows(
+        IllegalArgumentException.class, () -> 
tablePartitionOperations.addPartition(incorrectType));
+
+    // add partition with incorrect value
+    Partition incorrectValue =
+        Partitions.list(
+            "p1", new Literal[][] {{Literals.NULL, Literals.NULL}}, 
Collections.emptyMap());
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> tablePartitionOperations.addPartition(incorrectValue));
+
+    // add partition
+    Literal[][] p1Values = {{Literals.integerLiteral(1)}};
+    Literal[][] p2Values = {{Literals.integerLiteral(2)}};
+    Literal[][] p3Values = {{Literals.integerLiteral(3)}};
+
+    ListPartition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+    ListPartition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+    ListPartition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+    ListPartition p1Added = (ListPartition) 
tablePartitionOperations.addPartition(p1);
+    assertPartition(p1, p1Added);
+    ListPartition p2Added = (ListPartition) 
tablePartitionOperations.addPartition(p2);
+    assertPartition(p2, p2Added);
+    ListPartition p3Added = (ListPartition) 
tablePartitionOperations.addPartition(p3);
+    assertPartition(p3, p3Added);
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+
+    Map<String, ListPartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+    assertEquals(3, partitions.size());
+    assertPartition(p1, partitions.get("p1"));
+    assertPartition(p2, partitions.get("p2"));
+    assertPartition(p3, partitions.get("p3"));
+
+    assertPartition(p1, tablePartitionOperations.getPartition("p1"));
+    assertPartition(p2, tablePartitionOperations.getPartition("p2"));
+    assertPartition(p3, tablePartitionOperations.getPartition("p3"));
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(2, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+
+  @Test
+  void testCreatePartitionedTable() {
+    // create a range-partitioned table with assignments
+    String tableName = 
GravitinoITUtils.genRandomName("test_create_range_partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Distribution distribution = createDistribution();
+
+    Map<String, String> properties = createTableProperties();
+    Literal todayLiteral = Literals.of("2024-07-24", Types.DateType.get());
+    Literal tomorrowLiteral = Literals.of("2024-07-25", Types.DateType.get());
+    RangePartition p1 = Partitions.range("p1", todayLiteral, Literals.NULL, 
Collections.emptyMap());
+    RangePartition p2 =
+        Partitions.range("p2", tomorrowLiteral, todayLiteral, 
Collections.emptyMap());
+    RangePartition p3 =
+        Partitions.range("p3", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    Transform[] partitioning = {
+      Transforms.range(new String[] {STARROCKS_COL_NAME4}, new 
RangePartition[] {p1, p2, p3})
+    };
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        null,
+        null);
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        new Transform[] {Transforms.range(new String[] {STARROCKS_COL_NAME4})},
+        loadedTable);
+
+    // assert partition info
+    SupportsPartitions tablePartitionOperations = 
loadedTable.supportPartitions();
+    Map<String, RangePartition> loadedRangePartitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(Partition::name, p -> (RangePartition) 
p));
+    assertTrue(loadedRangePartitions.size() == 3);
+    assertTrue(loadedRangePartitions.containsKey("p1"));
+    assertPartition(
+        Partitions.range(
+            "p1",
+            todayLiteral,
+            Literals.of("0000-01-01", Types.DateType.get()),
+            Collections.emptyMap()),
+        loadedRangePartitions.get("p1"));
+    assertTrue(loadedRangePartitions.containsKey("p2"));
+    assertPartition(p2, loadedRangePartitions.get("p2"));
+    assertTrue(loadedRangePartitions.containsKey("p3"));
+    assertPartition(
+        Partitions.range(
+            "p3",
+            Literals.of("MAXVALUE", Types.DateType.get()),
+            tomorrowLiteral,
+            Collections.emptyMap()),
+        loadedRangePartitions.get("p3"));
+
+    // create a list-partitioned table with assignments
+    tableName = 
GravitinoITUtils.genRandomName("test_create_list_partitioned_table");
+    tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Literal<Integer> integerLiteral1 = Literals.integerLiteral(1);
+    Literal<Integer> integerLiteral2 = Literals.integerLiteral(2);
+    ListPartition p4 =
+        Partitions.list(
+            "p4",
+            new Literal[][] {{integerLiteral1, todayLiteral}, 
{integerLiteral1, tomorrowLiteral}},
+            Collections.emptyMap());
+    ListPartition p5 =
+        Partitions.list(
+            "p5",
+            new Literal[][] {{integerLiteral2, todayLiteral}, 
{integerLiteral2, tomorrowLiteral}},
+            Collections.emptyMap());
+    partitioning =
+        new Transform[] {
+          Transforms.list(
+              new String[][] {{STARROCKS_COL_NAME1}, {STARROCKS_COL_NAME4}},
+              new ListPartition[] {p4, p5})
+        };
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        null,
+        null);
+    loadedTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        new Transform[] {
+          Transforms.list(new String[][] {{STARROCKS_COL_NAME1}, 
{STARROCKS_COL_NAME4}})
+        },
+        loadedTable);
+
+    // assert partition info
+    tablePartitionOperations = loadedTable.supportPartitions();
+    Map<String, ListPartition> loadedListPartitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(Partition::name, p -> (ListPartition) 
p));
+    assertTrue(loadedListPartitions.size() == 2);
+    assertTrue(loadedListPartitions.containsKey("p4"));
+    assertPartition(p4, loadedListPartitions.get("p4"));
+    assertTrue(loadedListPartitions.containsKey("p5"));
+    assertPartition(p5, loadedListPartitions.get("p5"));
+  }
+
+  @Test
+  void testTableWithTimeStampColumn() {
+    // create a table
+    String tableName = 
GravitinoITUtils.genRandomName("test_table_with_timestamp_column");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    columns =
+        ArrayUtils.add(columns, Column.of("timestamp_col", 
Types.TimestampType.withoutTimeZone()));
+    Distribution distribution = createDistribution();
+
+    Map<String, String> properties = createTableProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        null,
+        null);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Column timestampColumn =
+        Arrays.stream(loadTable.columns())
+            .filter(c -> "timestamp_col".equals(c.name()))
+            .findFirst()
+            .get();
+
+    Assertions.assertEquals(Types.TimestampType.withoutTimeZone(), 
timestampColumn.dataType());
+  }
+
+  @Test
+  void testNonPartitionedTable() {
+    // create a non-partitioned table
+    String tableName = 
GravitinoITUtils.genRandomName("test_non_partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Distribution distribution = createDistribution();
+    Map<String, String> properties = createTableProperties();
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        null,
+        null);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        null,
+        partitioning,
+        loadTable);
+
+    // get table partition operations
+    SupportsPartitions tablePartitionOperations = 
loadTable.supportPartitions();
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.listPartitionNames());
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.listPartitions());
+
+    assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tablePartitionOperations.addPartition(
+                Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap())));
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.dropPartition("p1"));
+  }
+
+  @Test
+  void testAllDistribution() {
+    Distribution[] distributions =
+        new Distribution[] {
+          Distributions.even(1, Expression.EMPTY_EXPRESSION),
+          Distributions.hash(1, NamedReference.field(STARROCKS_COL_NAME1)),
+          Distributions.even(10, Expression.EMPTY_EXPRESSION),
+          Distributions.hash(0, NamedReference.field(STARROCKS_COL_NAME1)),
+          Distributions.hash(11, NamedReference.field(STARROCKS_COL_NAME1)),
+          Distributions.hash(
+              12,
+              NamedReference.field(STARROCKS_COL_NAME1),
+              NamedReference.field(STARROCKS_COL_NAME2))
+        };
+
+    for (Distribution distribution : distributions) {
+      String tableName = 
GravitinoITUtils.genRandomName("test_distribution_table");
+      NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
tableName);
+      Column[] columns = createColumns();
+      Map<String, String> properties = createTableProperties();
+      Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+      TableCatalog tableCatalog = catalog.asTableCatalog();
+      tableCatalog.createTable(
+          tableIdentifier,
+          columns,
+          table_comment,
+          properties,
+          partitioning,
+          distribution,
+          null,
+          null);
+      // load table
+      Table loadTable = tableCatalog.loadTable(tableIdentifier);
+
+      Assertions.assertEquals(distribution.strategy(), 
loadTable.distribution().strategy());
+      Assertions.assertArrayEquals(
+          distribution.expressions(), loadTable.distribution().expressions());
+
+      tableCatalog.dropTable(tableIdentifier);
+    }
+  }
+
+  @Test
+  void testAllDistributionWithAuto() {
+    Distribution distribution =
+        Distributions.auto(Strategy.HASH, 
NamedReference.field(STARROCKS_COL_NAME1));
+
+    String tableName = 
GravitinoITUtils.genRandomName("test_distribution_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Map<String, String> properties = createTableProperties();
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        null,
+        null);
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+
+    Assertions.assertEquals(distribution.strategy(), 
loadTable.distribution().strategy());
+    Assertions.assertEquals(distribution.number(), 
loadTable.distribution().number());
+    Assertions.assertArrayEquals(
+        distribution.expressions(), loadTable.distribution().expressions());
+    tableCatalog.dropTable(tableIdentifier);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java
new file mode 100644
index 0000000000..7f5abf97bc
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.operation;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.catalog.jdbc.TestJdbc;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.catalog.jdbc.utils.DataSourceUtils;
+import 
org.apache.gravitino.catalog.starrocks.converter.StarRocksColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.starrocks.converter.StarRocksExceptionConverter;
+import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter;
+import 
org.apache.gravitino.catalog.starrocks.operations.StarRocksDatabaseOperations;
+import 
org.apache.gravitino.catalog.starrocks.operations.StarRocksTableOperations;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.StarRocksContainer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class TestStarRocks extends TestJdbc {
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
+
+  @BeforeAll
+  public static void startup() {
+    containerSuite.startStarRocksContainer();
+
+    DATA_SOURCE = 
DataSourceUtils.createDataSource(getStarRocksCatalogProperties());
+
+    DATABASE_OPERATIONS = new StarRocksDatabaseOperations();
+    TABLE_OPERATIONS = new StarRocksTableOperations();
+    JDBC_EXCEPTION_CONVERTER = new StarRocksExceptionConverter();
+    DATABASE_OPERATIONS.initialize(DATA_SOURCE, JDBC_EXCEPTION_CONVERTER, 
Collections.emptyMap());
+    TABLE_OPERATIONS.initialize(
+        DATA_SOURCE,
+        JDBC_EXCEPTION_CONVERTER,
+        new StarRocksTypeConverter(),
+        new StarRocksColumnDefaultValueConverter(),
+        Collections.emptyMap());
+  }
+
+  // Overwrite the stop method to close the data source and stop the container
+  @AfterAll
+  public static void stop() {
+    DataSourceUtils.closeDataSource(DATA_SOURCE);
+    if (null != CONTAINER) {
+      CONTAINER.stop();
+    }
+  }
+
+  private static Map<String, String> getStarRocksCatalogProperties() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    StarRocksContainer starRocksContainer = 
containerSuite.getStarRocksContainer();
+
+    String jdbcUrl =
+        String.format(
+            "jdbc:mysql://%s:%d/",
+            starRocksContainer.getContainerIpAddress(), 
StarRocksContainer.FE_MYSQL_PORT);
+
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl);
+    catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME);
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
StarRocksContainer.USER_NAME);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
StarRocksContainer.PASSWORD);
+
+    return catalogProperties;
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java
new file mode 100644
index 0000000000..288816d86d
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.operation;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestStarRocksDatabaseOperations extends TestStarRocks {
+
+  @Test
+  public void testBaseOperationDatabase() {
+    String databaseName = RandomNameUtils.genRandomName("it_db");
+    String comment = "";
+    // StarRocks can't get properties after set, so the test case can't 
include properties
+    testBaseOperation(databaseName, Collections.emptyMap(), comment);
+
+    // recreate database, get exception.
+    Assertions.assertThrowsExactly(
+        SchemaAlreadyExistsException.class,
+        () -> DATABASE_OPERATIONS.create(databaseName, "", 
Collections.emptyMap()));
+
+    testDropDatabase(databaseName);
+  }
+
+  @Test
+  void testListSystemDatabase() {
+    List<String> databaseNames = DATABASE_OPERATIONS.listDatabases();
+    Assertions.assertFalse(databaseNames.contains("information_schema"));
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
new file mode 100644
index 0000000000..fc8b804aec
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.operation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Maps;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter;
+import 
org.apache.gravitino.catalog.starrocks.operations.StarRocksTablePartitionOperations;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+@Tag("gravitino-docker-test")
+public class TestStarRocksTableOperations extends TestStarRocks {
+  private static final JdbcTypeConverter TYPE_CONVERTER = new 
StarRocksTypeConverter();
+  private static final Type VARCHAR_255 = Types.VarCharType.of(255);
+  private static final Type VARCHAR_1024 = Types.VarCharType.of(1024);
+
+  private static final Type INT = Types.IntegerType.get();
+
+  private static final Integer DEFAULT_BUCKET_SIZE = 1;
+
+  private static final String databaseName = 
GravitinoITUtils.genRandomName("starrocks_test_db");
+
+  // Because the creation of Schema Change is an asynchronous process, we need 
to wait for a while
+  // For more information, you can refer to the comment in
+  // StarRocksTableOperations.generateAlterTableSql().
+  private static final long MAX_WAIT_IN_SECONDS = 30;
+
+  private static final long WAIT_INTERVAL_IN_SECONDS = 1;
+
+  @BeforeAll
+  public static void startup() {
+    TestStarRocks.startup();
+    createDatabase();
+  }
+
+  private static void createDatabase() {
+    DATABASE_OPERATIONS.create(databaseName, "", new HashMap<>());
+  }
+
+  private static Map<String, String> createProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    return properties;
+  }
+
+  @Test
+  void testAllDistribution() {
+    Distribution[] distributions =
+        new Distribution[] {
+          Distributions.even(DEFAULT_BUCKET_SIZE, Expression.EMPTY_EXPRESSION),
+          Distributions.hash(DEFAULT_BUCKET_SIZE, 
NamedReference.field("col_1")),
+          Distributions.even(10, Expression.EMPTY_EXPRESSION),
+          Distributions.hash(0, NamedReference.field("col_1")),
+          Distributions.hash(11, NamedReference.field("col_1")),
+          Distributions.hash(12, NamedReference.field("col_1"), 
NamedReference.field("col_2"))
+        };
+
+    for (Distribution distribution : distributions) {
+      String tableName = 
GravitinoITUtils.genRandomName("starrocks_basic_test_table");
+      String tableComment = "gravitino_table_test_comment";
+      List<JdbcColumn> columns = new ArrayList<>();
+      JdbcColumn col_1 =
+          
JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build();
+      columns.add(col_1);
+      JdbcColumn col_2 =
+          
JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build();
+      columns.add(col_2);
+      JdbcColumn col_3 =
+          
JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build();
+      columns.add(col_3);
+      Map<String, String> properties = new HashMap<>();
+      Index[] indexes = new Index[] {};
+
+      // create table
+      TABLE_OPERATIONS.create(
+          databaseName,
+          tableName,
+          columns.toArray(new JdbcColumn[0]),
+          tableComment,
+          createProperties(),
+          null,
+          distribution,
+          indexes);
+      JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
+      assertionsTableInfo(
+          tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+      Assertions.assertEquals(distribution.strategy(), 
load.distribution().strategy());
+      Assertions.assertArrayEquals(distribution.expressions(), 
load.distribution().expressions());
+      TABLE_OPERATIONS.drop(databaseName, tableName);
+    }
+  }
+
+  @Test
+  public void testBasicTableOperation() {
+    String tableName = 
GravitinoITUtils.genRandomName("starrocks_basic_test_table");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    JdbcColumn col_1 =
+        
JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build();
+    columns.add(col_1);
+    JdbcColumn col_2 =
+        
JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build();
+    columns.add(col_2);
+    JdbcColumn col_3 =
+        
JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build();
+    columns.add(col_3);
+    Map<String, String> properties = new HashMap<>();
+
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+
+    // create table
+    TABLE_OPERATIONS.create(
+        databaseName,
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        createProperties(),
+        null,
+        distribution,
+        indexes);
+    List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+    assertTrue(listTables.contains(tableName));
+    JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    // rename table
+    String newName = GravitinoITUtils.genRandomName("new_table");
+    Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.rename(databaseName, 
tableName, newName));
+    Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.load(databaseName, 
newName));
+
+    Assertions.assertTrue(TABLE_OPERATIONS.drop(databaseName, newName), "table 
should be dropped");
+
+    listTables = TABLE_OPERATIONS.listTables(databaseName);
+    Assertions.assertFalse(listTables.contains(newName));
+
+    Assertions.assertFalse(
+        TABLE_OPERATIONS.drop(databaseName, newName), "table should be 
non-existent");
+  }
+
+  @Test
+  public void testAlterTable() {
+    String tableName = 
GravitinoITUtils.genRandomName("starrocks_alter_test_table");
+
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    JdbcColumn col_1 =
+        
JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build();
+    columns.add(col_1);
+    JdbcColumn col_2 =
+        
JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build();
+    columns.add(col_2);
+    JdbcColumn col_3 =
+        
JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build();
+    columns.add(col_3);
+    Map<String, String> properties = new HashMap<>();
+
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+
+    // create table
+    TABLE_OPERATIONS.create(
+        databaseName,
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        createProperties(),
+        null,
+        distribution,
+        indexes);
+    JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
+    assertionsTableInfo(
+        tableName, tableComment, columns, properties, indexes, 
Transforms.EMPTY_TRANSFORM, load);
+
+    TABLE_OPERATIONS.alterTable(
+        databaseName,
+        tableName,
+        TableChange.updateColumnType(new String[] {col_3.name()}, 
VARCHAR_1024));
+
+    // After modifying the type, check it
+    columns.clear();
+    col_3 =
+        JdbcColumn.builder()
+            .withName(col_3.name())
+            .withType(VARCHAR_1024)
+            .withComment(col_3.comment())
+            .build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () ->
+                assertionsTableInfo(
+                    tableName,
+                    tableComment,
+                    columns,
+                    properties,
+                    indexes,
+                    Transforms.EMPTY_TRANSFORM,
+                    TABLE_OPERATIONS.load(databaseName, tableName)));
+
+    // add new column
+    TABLE_OPERATIONS.alterTable(
+        databaseName,
+        tableName,
+        TableChange.addColumn(new String[] {"col_4"}, VARCHAR_255, "txt4", 
true));
+
+    columns.clear();
+    JdbcColumn col_4 =
+        
JdbcColumn.builder().withName("col_4").withType(VARCHAR_255).withComment("txt4").build();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    columns.add(col_4);
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () ->
+                assertionsTableInfo(
+                    tableName,
+                    tableComment,
+                    columns,
+                    properties,
+                    indexes,
+                    Transforms.EMPTY_TRANSFORM,
+                    TABLE_OPERATIONS.load(databaseName, tableName)));
+
+    // change column position
+    TABLE_OPERATIONS.alterTable(
+        databaseName,
+        tableName,
+        TableChange.updateColumnPosition(
+            new String[] {"col_3"}, 
TableChange.ColumnPosition.after("col_4")));
+
+    columns.clear();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_4);
+    columns.add(col_3);
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () ->
+                assertionsTableInfo(
+                    tableName,
+                    tableComment,
+                    columns,
+                    properties,
+                    indexes,
+                    Transforms.EMPTY_TRANSFORM,
+                    TABLE_OPERATIONS.load(databaseName, tableName)));
+
+    // drop column if exist
+    TABLE_OPERATIONS.alterTable(
+        databaseName, tableName, TableChange.deleteColumn(new String[] 
{"col_4"}, true));
+    columns.clear();
+    columns.add(col_1);
+    columns.add(col_2);
+    columns.add(col_3);
+    Awaitility.await()
+        .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
+        .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
+        .untilAsserted(
+            () ->
+                assertionsTableInfo(
+                    tableName,
+                    tableComment,
+                    columns,
+                    properties,
+                    indexes,
+                    Transforms.EMPTY_TRANSFORM,
+                    TABLE_OPERATIONS.load(databaseName, tableName)));
+
+    // delete column that does not exist
+    IllegalArgumentException illegalArgumentException =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                TABLE_OPERATIONS.alterTable(
+                    databaseName,
+                    tableName,
+                    TableChange.deleteColumn(new String[] {"col_4"}, false)));
+
+    Assertions.assertEquals(
+        "Delete column does not exist: col_4", 
illegalArgumentException.getMessage());
+    Assertions.assertDoesNotThrow(
+        () ->
+            TABLE_OPERATIONS.alterTable(
+                databaseName, tableName, TableChange.deleteColumn(new String[] 
{"col_4"}, true)));
+  }
+
+  @Test
+  public void testCreateAllTypeTable() {
+    String tableName = GravitinoITUtils.genRandomName("all_type_table");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    
columns.add(JdbcColumn.builder().withName("col_1").withType(Types.IntegerType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_3").withType(Types.ByteType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_4").withType(Types.ShortType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_5").withType(Types.IntegerType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_6").withType(Types.LongType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_7").withType(Types.FloatType.get()).build());
+    
columns.add(JdbcColumn.builder().withName("col_8").withType(Types.DoubleType.get()).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_9").withType(Types.DecimalType.of(21, 
2)).build());
+    
columns.add(JdbcColumn.builder().withName("col_10").withType(Types.DateType.get()).build());
+    columns.add(
+        
JdbcColumn.builder().withName("col_11").withType(Types.FixedCharType.of(10)).build());
+    
columns.add(JdbcColumn.builder().withName("col_12").withType(Types.VarCharType.of(10)).build());
+    
columns.add(JdbcColumn.builder().withName("col_13").withType(Types.StringType.get()).build());
+    columns.add(
+        JdbcColumn.builder()
+            .withName("col_14")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .build());
+
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+    // create table
+    TABLE_OPERATIONS.create(
+        databaseName,
+        tableName,
+        columns.toArray(new JdbcColumn[0]),
+        tableComment,
+        createProperties(),
+        null,
+        distribution,
+        indexes);
+
+    JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
+    assertionsTableInfo(
+        tableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        load);
+  }
+
+  @Test
+  public void testCreateNotSupportTypeTable() {
+    String tableName = RandomNameUtils.genRandomName("unsupported_type_table");
+    String tableComment = "test_comment";
+    List<JdbcColumn> columns = new ArrayList<>();
+    List<Type> notSupportType =
+        Arrays.asList(
+            Types.FixedType.of(10),
+            Types.IntervalDayType.get(),
+            Types.IntervalYearType.get(),
+            Types.UUIDType.get(),
+            Types.ListType.of(Types.DateType.get(), true),
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+            Types.UnionType.of(Types.IntegerType.get()),
+            Types.StructType.of(
+                Types.StructType.Field.notNullField("col_1", 
Types.IntegerType.get())));
+
+    for (Type type : notSupportType) {
+      columns.clear();
+      
columns.add(JdbcColumn.builder().withName("col_1").withType(Types.IntegerType.get()).build());
+      columns.add(
+          
JdbcColumn.builder().withName("col_2").withType(type).withNullable(false).build());
+
+      JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]);
+      IllegalArgumentException illegalArgumentException =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> {
+                TABLE_OPERATIONS.create(
+                    databaseName,
+                    tableName,
+                    jdbcCols,
+                    tableComment,
+                    createProperties(),
+                    null,
+                    Distributions.hash(DEFAULT_BUCKET_SIZE, 
NamedReference.field("col_1")),
+                    Indexes.EMPTY_INDEXES);
+              });
+      Assertions.assertTrue(
+          illegalArgumentException
+              .getMessage()
+              .contains(
+                  String.format(
+                      "Couldn't convert Gravitino type %s to StarRocks type",
+                      type.simpleString())));
+    }
+  }
+
+  @Test
+  public void testCreatePartitionedTable() {
+    String tableComment = "partition_table_comment";
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+    JdbcColumn col3 =
+        
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+    JdbcColumn col4 =
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+
+    // create table with range partition
+    String rangePartitionTableName = 
GravitinoITUtils.genRandomName("range_partition_table");
+    LocalDate today = LocalDate.now();
+    LocalDate tomorrow = today.plusDays(1);
+    Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+    Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+    RangePartition rangePartition1 = Partitions.range("p1", todayLiteral, 
Literals.NULL, null);
+    RangePartition rangePartition2 = Partitions.range("p2", tomorrowLiteral, 
todayLiteral, null);
+    RangePartition rangePartition3 = Partitions.range("p3", Literals.NULL, 
tomorrowLiteral, null);
+    Transform[] rangePartition =
+        new Transform[] {
+          Transforms.range(
+              new String[] {col4.name()},
+              new RangePartition[] {rangePartition1, rangePartition2, 
rangePartition3})
+        };
+    TABLE_OPERATIONS.create(
+        databaseName,
+        rangePartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        rangePartition,
+        distribution,
+        indexes);
+    JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, 
rangePartitionTableName);
+    assertionsTableInfo(
+        rangePartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        new Transform[] {Transforms.range(new String[] {col4.name()})},
+        rangePartitionTable);
+
+    // assert partition info
+    JdbcTablePartitionOperations tablePartitionOperations =
+        new StarRocksTablePartitionOperations(
+            DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+    Map<String, RangePartition> loadedRangePartitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(Partition::name, p -> (RangePartition) 
p));
+    assertTrue(loadedRangePartitions.containsKey("p1"));
+    RangePartition actualP1 = loadedRangePartitions.get("p1");
+    assertEquals(todayLiteral, actualP1.upper());
+    assertEquals(Literals.of("0000-01-01", Types.DateType.get()), 
actualP1.lower());
+    assertTrue(loadedRangePartitions.containsKey("p2"));
+    RangePartition actualP2 = loadedRangePartitions.get("p2");
+    assertEquals(tomorrowLiteral, actualP2.upper());
+    assertEquals(todayLiteral, actualP2.lower());
+    assertTrue(loadedRangePartitions.containsKey("p3"));
+    RangePartition actualP3 = loadedRangePartitions.get("p3");
+    assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), 
actualP3.upper());
+    assertEquals(tomorrowLiteral, actualP3.lower());
+
+    // create table with list partition
+    String listPartitionTableName = 
GravitinoITUtils.genRandomName("list_partition_table");
+    Literal<Integer> integerLiteral1 = Literals.integerLiteral(1);
+    Literal<Integer> integerLiteral2 = Literals.integerLiteral(2);
+    ListPartition listPartition1 =
+        Partitions.list(
+            "p1",
+            new Literal[][] {{integerLiteral1, todayLiteral}, 
{integerLiteral1, tomorrowLiteral}},
+            null);
+    ListPartition listPartition2 =
+        Partitions.list(
+            "p2",
+            new Literal[][] {{integerLiteral2, todayLiteral}, 
{integerLiteral2, tomorrowLiteral}},
+            null);
+    Transform[] listPartition =
+        new Transform[] {
+          Transforms.list(
+              new String[][] {{col1.name()}, {col4.name()}},
+              new ListPartition[] {listPartition1, listPartition2})
+        };
+    TABLE_OPERATIONS.create(
+        databaseName,
+        listPartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        listPartition,
+        distribution,
+        indexes);
+    JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, 
listPartitionTableName);
+    assertionsTableInfo(
+        listPartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        new Transform[] {Transforms.list(new String[][] {{col1.name()}, 
{col4.name()}})},
+        listPartitionTable);
+
+    // assert partition info
+    tablePartitionOperations =
+        new StarRocksTablePartitionOperations(
+            DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+    Map<String, ListPartition> loadedListPartitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(Partition::name, p -> (ListPartition) p, 
(p1, p2) -> p2));
+    assertTrue(loadedListPartitions.containsKey("p1"));
+    assertTrue(Arrays.deepEquals(listPartition1.lists(), 
loadedListPartitions.get("p1").lists()));
+    assertTrue(loadedListPartitions.containsKey("p2"));
+    assertTrue(Arrays.deepEquals(listPartition2.lists(), 
loadedListPartitions.get("p2").lists()));
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java
new file mode 100644
index 0000000000..1a722f9b98
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.operation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter;
+import 
org.apache.gravitino.catalog.starrocks.operations.StarRocksTablePartitionOperations;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestStarRocksTablePartitionOperations extends TestStarRocks {
+  private static final String databaseName = 
GravitinoITUtils.genRandomName("starrocks_test_db");
+  private static final Integer DEFAULT_BUCKET_SIZE = 1;
+  private static final JdbcTypeConverter TYPE_CONVERTER = new 
StarRocksTypeConverter();
+
+  @BeforeAll
+  public static void startup() {
+    TestStarRocks.startup();
+    createDatabase();
+  }
+
+  private static void createDatabase() {
+    DATABASE_OPERATIONS.create(databaseName, "", new HashMap<>());
+  }
+
+  private static Map<String, String> createProperties() {
+    return ImmutableMap.of();
+  }
+
+  @Test
+  public void testRangePartition() {
+    String tableComment = "range_partitioned_table_comment";
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+    JdbcColumn col3 =
+        
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+    JdbcColumn col4 =
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+    String rangePartitionTableName = 
GravitinoITUtils.genRandomName("range_partition_table");
+    Transform[] rangePartition = new Transform[] {Transforms.range(new 
String[] {col4.name()})};
+    TABLE_OPERATIONS.create(
+        databaseName,
+        rangePartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        rangePartition,
+        distribution,
+        indexes);
+
+    // assert table info
+    JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, 
rangePartitionTableName);
+    assertionsTableInfo(
+        rangePartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        rangePartition,
+        rangePartitionTable);
+    List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+    assertTrue(listTables.contains(rangePartitionTableName));
+
+    // create Table Partition Operations manually
+    JdbcTablePartitionOperations tablePartitionOperations =
+        new StarRocksTablePartitionOperations(
+            DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrect =
+        Partitions.list("test_incorrect", new Literal[][] {{Literals.NULL}}, 
null);
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
tablePartitionOperations.addPartition(incorrect));
+    assertEquals(
+        "Table "
+            + rangePartitionTableName
+            + " is non-list-partitioned, but trying to add a list partition",
+        exception.getMessage());
+
+    // add different kinds of range partitions
+    LocalDate today = LocalDate.now();
+    LocalDate tomorrow = today.plusDays(1);
+    Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+    Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+    Partition p1 = Partitions.range("p1", todayLiteral, Literals.NULL, 
Collections.emptyMap());
+    Partition p2 = Partitions.range("p2", tomorrowLiteral, todayLiteral, 
Collections.emptyMap());
+    Partition p3 = Partitions.range("p3", Literals.NULL, tomorrowLiteral, 
Collections.emptyMap());
+    assertEquals(p1, tablePartitionOperations.addPartition(p1));
+    assertEquals(p2, tablePartitionOperations.addPartition(p2));
+    assertEquals(p3, tablePartitionOperations.addPartition(p3));
+
+    // add partition with same name
+    Partition p4 = Partitions.range("p3", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    assertThrows(
+        PartitionAlreadyExistsException.class, () -> 
tablePartitionOperations.addPartition(p4));
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+
+    Map<String, RangePartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (RangePartition) p));
+    assertEquals(3, partitions.size());
+    RangePartition actualP1 = partitions.get("p1");
+    assertEquals(todayLiteral, actualP1.upper());
+    assertEquals(Literals.of("0000-01-01", Types.DateType.get()), 
actualP1.lower());
+    RangePartition actualP2 = partitions.get("p2");
+    assertEquals(tomorrowLiteral, actualP2.upper());
+    assertEquals(todayLiteral, actualP2.lower());
+    RangePartition actualP3 = partitions.get("p3");
+    assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), 
actualP3.upper());
+    assertEquals(tomorrowLiteral, actualP3.lower());
+
+    actualP1 = (RangePartition) tablePartitionOperations.getPartition("p1");
+    assertEquals(todayLiteral, actualP1.upper());
+    assertEquals(Literals.of("0000-01-01", Types.DateType.get()), 
actualP1.lower());
+    actualP2 = (RangePartition) tablePartitionOperations.getPartition("p2");
+    assertEquals(tomorrowLiteral, actualP2.upper());
+    assertEquals(todayLiteral, actualP2.lower());
+    actualP3 = (RangePartition) tablePartitionOperations.getPartition("p3");
+    assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), 
actualP3.upper());
+    assertEquals(tomorrowLiteral, actualP3.lower());
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(2, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+
+  @Test
+  public void testListPartition() {
+    String tableComment = "list_partitioned_table_comment";
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+    JdbcColumn col3 =
+        
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+    JdbcColumn col4 =
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+    String listPartitionTableName = 
GravitinoITUtils.genRandomName("list_partition_table");
+    Transform[] listPartition =
+        new Transform[] {Transforms.list(new String[][] {{col1.name()}, 
{col4.name()}})};
+    TABLE_OPERATIONS.create(
+        databaseName,
+        listPartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        listPartition,
+        distribution,
+        indexes);
+
+    // assert table info
+    JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, 
listPartitionTableName);
+    assertionsTableInfo(
+        listPartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        listPartition,
+        listPartitionTable);
+    List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+    assertTrue(listTables.contains(listPartitionTableName));
+
+    // create Table Partition Operations manually
+    JdbcTablePartitionOperations tablePartitionOperations =
+        new StarRocksTablePartitionOperations(
+            DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrectType =
+        Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> tablePartitionOperations.addPartition(incorrectType));
+    assertEquals(
+        "Table "
+            + listPartitionTableName
+            + " is non-range-partitioned, but trying to add a range partition",
+        exception.getMessage());
+
+    // add partition with incorrect value
+    Partition incorrectValue =
+        Partitions.list("p1", new Literal[][] {{Literals.NULL}}, 
Collections.emptyMap());
+    exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> tablePartitionOperations.addPartition(incorrectValue));
+    assertEquals("The number of partitioning columns must be consistent", 
exception.getMessage());
+
+    // add different kinds of list partitions
+    LocalDate today = LocalDate.now();
+    LocalDate tomorrow = today.plusDays(1);
+    Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+    Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+    Literal[][] p1Values = {{Literals.integerLiteral(1), todayLiteral}};
+    Literal[][] p2Values = {{Literals.integerLiteral(2), todayLiteral}};
+    Literal[][] p3Values = {{Literals.integerLiteral(1), tomorrowLiteral}};
+    Literal[][] p4Values = {{Literals.integerLiteral(2), tomorrowLiteral}};
+    Partition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+    Partition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+    Partition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+    Partition p4 = Partitions.list("p4", p4Values, Collections.emptyMap());
+    assertEquals(p1, tablePartitionOperations.addPartition(p1));
+    assertEquals(p2, tablePartitionOperations.addPartition(p2));
+    assertEquals(p3, tablePartitionOperations.addPartition(p3));
+    assertEquals(p4, tablePartitionOperations.addPartition(p4));
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(4, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+    assertTrue(partitionNames.contains("p4"));
+
+    Map<String, ListPartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+    assertEquals(4, partitions.size());
+    ListPartition actualP1 = partitions.get("p1");
+    assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+    ListPartition actualP2 = partitions.get("p2");
+    assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+    ListPartition actualP3 = partitions.get("p3");
+    assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+    ListPartition actualP4 = partitions.get("p4");
+    assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+    actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1");
+    assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+    actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2");
+    assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+    actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3");
+    assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+    actualP4 = (ListPartition) tablePartitionOperations.getPartition("p4");
+    assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/Config.java 
b/core/src/main/java/org/apache/gravitino/Config.java
index 8696e42063..3200854db9 100644
--- a/core/src/main/java/org/apache/gravitino/Config.java
+++ b/core/src/main/java/org/apache/gravitino/Config.java
@@ -198,7 +198,7 @@ public abstract class Config {
         (k, v) -> {
           String trimmedK = k.trim();
           String trimmedV = v.trim();
-          if (!trimmedK.isEmpty() && !trimmedV.isEmpty()) {
+          if (!trimmedK.isEmpty()) {
             if (predicate.test(trimmedK)) {
               configMap.put(trimmedK, trimmedV);
             }
diff --git 
a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java 
b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index b2b9ed4884..17c08ac0e9 100644
--- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -29,9 +29,12 @@ public final class ConfigConstants {
   /** HTTPS Server port, reused by Gravitino server and Iceberg REST server */
   public static final String WEBSERVER_HTTPS_PORT = "httpsPort";
 
-  /** The value of messages used to indicate that the configuration is not 
set. */
+  /** The value of messages used to indicate that the configuration is set to 
an empty value. */
   public static final String NOT_BLANK_ERROR_MSG = "The value can't be blank";
 
+  /** The value of messages used to indicate that the configuration is not 
set. */
+  public static final String NOT_NULL_ERROR_MSG = "The value can't be null";
+
   /** The value of messages used to indicate that the configuration should be 
a positive number. */
   public static final String POSITIVE_NUMBER_ERROR_MSG = "The value must be a 
positive number";
 
diff --git a/docs/index.md b/docs/index.md
index 1a1bbc236b..3b45c6e871 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -85,6 +85,7 @@ Gravitino currently supports the following catalogs:
 * [**Paimon catalog**](./lakehouse-paimon-catalog.md)
 * [**PostgreSQL catalog**](./jdbc-postgresql-catalog.md)
 * [**OceanBase catalog**](./jdbc-oceanbase-catalog.md)
+* [**StarRocks catalog**](./jdbc-starrocks-catalog.md)
 
 **Fileset catalogs:**
 
diff --git a/docs/jdbc-starrocks-catalog.md b/docs/jdbc-starrocks-catalog.md
new file mode 100644
index 0000000000..eecfc7aa6e
--- /dev/null
+++ b/docs/jdbc-starrocks-catalog.md
@@ -0,0 +1,197 @@
+---
+title: "StarRocks catalog"
+slug: /jdbc-starrocks-catalog
+keywords:
+- jdbc
+- starrocks
+- metadata
+license: "This software is licensed under the Apache License version 2."
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+## Introduction
+
+Apache Gravitino provides the ability to manage 
[StarRocks](https://www.starrocks.io/) metadata through JDBC connection.
+
+:::caution
+Gravitino saves some system information in table comments, like
+`(From Gravitino, DO NOT EDIT: gravitino.v1.uid1078334182909406185)`, please 
don't change or remove this message.
+:::
+
+## Catalog
+
+### Catalog capabilities
+
+- Gravitino catalog corresponds to the StarRocks instance.
+- Supports metadata management of StarRocks (3.3.x).
+- Supports [column default 
value](./manage-relational-metadata-using-gravitino.md#table-column-default-value).
+
+### Catalog properties
+
+You can pass to a StarRocks data source any property that isn't defined by 
Gravitino by adding
+`gravitino.bypass.` prefix as a catalog property. For example, catalog property
+`gravitino.bypass.maxWaitMillis` will pass `maxWaitMillis` to the data source 
property.
+
+You can check the relevant data source configuration in
+[data source 
properties](https://commons.apache.org/proper/commons-dbcp/configuration.html) 
for
+more details.
+
+Besides the [common catalog 
properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration),
 the StarRocks catalog has the following properties:
+
+| Configuration item   | Description                                           
                                                                                
                                                                                
                                                                                
                                                                                
                           | Default value | Required | Since Version    |
+|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|------------------|
+| `jdbc-url`           | JDBC URL for connecting to the database. For example, 
`jdbc:mysql://localhost:9030`                                                   
                                                                                
                                                                                
                                                                                
                           | (none)        | Yes      | 1.0.0            |
+| `jdbc-driver`        | The driver of the JDBC connection. For example, 
`com.mysql.jdbc.Driver`.                                                        
                                                                                
                                                                                
                                                                                
                                 | (none)        | Yes      | 1.0.0            |
+| `jdbc-user`          | The JDBC user name.                                   
                                                                                
                                                                                
                                                                                
                                                                                
                           | (none)        | Yes      | 1.0.0            |
+| `jdbc-password`      | The JDBC password.                                    
                                                                                
                                                                                
                                                                                
                                                                                
                           | (none)        | Yes      | 1.0.0            |
+| `jdbc.pool.min-size` | The minimum number of connections in the pool. `2` by 
default.                                                                        
                                                                                
                                                                                
                                                                                
                           | `2`           | No       | 1.0.0            |
+| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` 
by default.                                                                     
                                                                                
                                                                                
                                                                                
                             | `10`          | No       | 1.0.0            |
+| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` 
by default.                                                                     
                                                                                
                                                                                
                                                                                
                             | `10`          | No       | 1.0.0            |
+
+
+Before using the StarRocks Catalog, you must download the corresponding JDBC 
driver to the `catalogs/jdbc-starrocks/libs` directory.
+Gravitino doesn't package the JDBC driver for StarRocks due to licensing 
issues.
+
+### Driver Version Compatibility
+
+The StarRocks catalog includes driver version compatibility checks for 
datetime precision calculation:
+
+- **MySQL Connector/J versions >= 8.0.16**: Full support for datetime 
precision calculation
+- **MySQL Connector/J versions < 8.0.16**: Limited support - datetime 
precision calculation returns `null` with a warning log
+
+This limitation affects the following datetime types:
+- `DATETIME(p)` - datetime precision
+
+When using an unsupported driver version, the system will:
+1. Continue to work normally with default precision (0)
+2. Log a warning message indicating the driver version limitation
+3. Return `null` for precision calculations to avoid incorrect results
+
+**Example warning log:**
+```
+WARN: MySQL driver version mysql-connector-java-8.0.11 is below 8.0.16, 
+columnSize may not be accurate for precision calculation. 
+Returning null for DATETIME type precision. Driver version: 
mysql-connector-java-8.0.11
+```
+
+**Recommended driver versions:**
+- `mysql-connector-java-8.0.16` or higher
+
+### Catalog operations
+
+Refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) 
for more details.
+
+## Schema
+
+### Schema capabilities
+
+- Gravitino's schema concept corresponds to the StarRocks database.
+- Supports creating schema.
+- Supports dropping schema.
+
+### Schema properties
+
+As StarRocks can't get thr properties after set, So now we do not support set 
Schema properties.  
+
+### Schema operations
+
+Please refer to
+[Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#schema-operations) 
for more details.
+
+## Table
+
+### Table capabilities
+
+- Gravitino's table concept corresponds to the StarRocks table.
+- Supports [column default 
value](./manage-relational-metadata-using-gravitino.md#table-column-default-value).
+
+#### Table column types
+
+| Gravitino Type | StarRocks Type |
+|----------------|----------------|
+| `Boolean`      | `Boolean`      |
+| `Byte`         | `TinyInt`      |
+| `Short`        | `SmallInt`     |
+| `Integer`      | `Int`          |
+| `Long`         | `BigInt`       |
+| `Float`        | `Float`        |
+| `Double`       | `Double`       |
+| `Decimal`      | `Decimal`      |
+| `Date`         | `Date`         |
+| `Timestamp`    | `Datetime`     |
+| `VarChar`      | `VarChar`      |
+| `FixedChar`    | `Char`         |
+| `String`       | `String`       |
+| `Binary`       | `Binary`       |
+
+
+StarRocks doesn't support Gravitino `Fixed` `Timestamp_tz` `IntervalDay` 
`IntervalYear` `Union` `UUID` type.
+The data types other than those listed above are mapped to Gravitino's 
**[Unparsed 
Type](./manage-relational-metadata-using-gravitino.md#unparsed-type)** that 
represents an unresolvable data type since 1.0.0.
+
+:::note
+Gravitino can not load StarRocks `array`, `map` and `struct` type correctly, 
because StarRocks doesn't support these types in JDBC.
+:::
+
+
+### Table column auto-increment
+
+Unsupported for now.
+
+### Table properties
+
+- StarRocks supports table properties, and you can set them in the table 
properties.
+- Only supports StarRocks table properties and doesn't support user-defined 
properties.
+
+### Table indexes
+
+Unsupported
+
+### Table partitioning
+
+The StarRocks catalog supports partitioned tables. 
+Users can create partitioned tables in the StarRocks catalog with specific 
partitioning attributes. It is also supported to pre-assign partitions when 
creating StarRocks tables. 
+Note that although Gravitino supports several partitioning strategies, 
StarRocks inherently only supports these two partitioning strategies:
+
+- `RANGE`
+- `LIST`
+
+:::caution
+The `fieldName` specified in the partitioning attributes must be the name of 
columns defined in the table.
+:::
+
+### Table distribution
+
+Users can also specify the distribution strategy when creating tables in the 
StarRocks catalog. Currently, the StarRocks catalog supports the following 
distribution strategies:
+- `HASH`
+- `RANDOM`
+
+For the `RANDOM` distribution strategy, Gravitino uses the `EVEN` to represent 
it. More information about the distribution strategy defined in Gravitino can 
be found 
[here](./table-partitioning-distribution-sort-order-indexes.md#table-distribution).
+
+
+### Table operations
+
+Please refer to [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) 
for more details.
+
+#### Alter table operations
+
+Gravitino supports these table alteration operations:
+
+- `RenameTable`
+- `UpdateComment`
+- `AddColumn`
+- `DeleteColumn`
+- `UpdateColumnType`
+- `UpdateColumnPosition`
+- `SetProperty`
+
+Please be aware that:
+
+ - Not all table alteration operations can be processed in batches.
+ - Schema changes, such as adding/modifying/dropping columns can be processed 
in batches.
+ - The schema alteration in StarRocks is asynchronous. You might get an 
outdated schema if you
+   execute a schema query immediately after the alteration. It is recommended 
to pause briefly
+   after the schema alteration. Gravitino will add the schema alteration 
status into
+   the schema information in the upcoming version to solve this problem.
+- StarRocks has limited support for [alert table 
properties](https://docs.starrocks.io/docs/3.3/sql-reference/sql-statements/table_bucket_part_index/ALTER_TABLE/#modify-table-properties),
 And it suggests modify one property at a time.  
\ No newline at end of file
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index d2a5ee6152..0127752068 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -66,7 +66,7 @@ public class ContainerSuite implements Closeable {
   private static volatile DorisContainer dorisContainer;
   private static volatile HiveContainer kerberosHiveContainer;
   private static volatile HiveContainer sqlBaseHiveContainer;
-
+  private static volatile StarRocksContainer starRocksContainer;
   private static volatile MySQLContainer mySQLContainer;
   private static volatile MySQLContainer mySQLVersion5Container;
   private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap 
=
@@ -480,6 +480,27 @@ public class ContainerSuite implements Closeable {
     }
   }
 
+  public void startStarRocksContainer() {
+    if (starRocksContainer == null) {
+      synchronized (ContainerSuite.class) {
+        if (starRocksContainer == null) {
+          initIfNecessary();
+          // Start StarRocks container
+          StarRocksContainer.Builder starRocksBuilder =
+              StarRocksContainer.builder().withNetwork(network);
+          StarRocksContainer container = 
closer.register(starRocksBuilder.build());
+          try {
+            container.start();
+          } catch (Exception e) {
+            LOG.error("Failed to start StarRocks container", e);
+            throw new RuntimeException("Failed to start StarRocks container", 
e);
+          }
+          starRocksContainer = container;
+        }
+      }
+    }
+  }
+
   public GravitinoLocalStackContainer getLocalStackContainer() {
     return gravitinoLocalStackContainer;
   }
@@ -556,6 +577,10 @@ public class ContainerSuite implements Closeable {
     return mySQLVersion5Container;
   }
 
+  public StarRocksContainer getStarRocksContainer() {
+    return starRocksContainer;
+  }
+
   public PostgreSQLContainer getPostgreSQLContainer() throws 
NoSuchElementException {
     return getPostgreSQLContainer(PGImageName.VERSION_13);
   }
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java
new file mode 100644
index 0000000000..99be7a0c6b
--- /dev/null
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class StarRocksContainer extends BaseContainer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(StarRocksContainer.class);
+
+  public static final String DEFAULT_IMAGE = 
"starrocks/allin1-ubuntu:3.3-latest";
+  public static final String HOST_NAME = "gravitino-ci-starrocks";
+  public static final String USER_NAME = "root";
+  public static final String PASSWORD = "";
+  public static final int FE_HTTP_PORT = 8030;
+  public static final int FE_MYSQL_PORT = 9030;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  protected StarRocksContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  @Override
+  protected void setupContainer() {
+    super.setupContainer();
+    withLogConsumer(new PrintingContainerLog(format("%-14s| ", 
"StarRocksContainer")));
+    withStartupTimeout(Duration.ofMinutes(5));
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    Preconditions.check("StarRocks container startup failed!", 
checkContainerStatus(5));
+  }
+
+  @Override
+  public void close() {
+    super.close();
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    String starRocksJdbcUrl = format("jdbc:mysql://%s:%d/", 
getContainerIpAddress(), FE_MYSQL_PORT);
+    LOG.info("StarRocks url is " + starRocksJdbcUrl);
+
+    await()
+        .atMost(30, TimeUnit.SECONDS)
+        .pollInterval(30 / retryLimit, TimeUnit.SECONDS)
+        .until(
+            () -> {
+              try (Connection connection =
+                      DriverManager.getConnection(starRocksJdbcUrl, USER_NAME, 
"");
+                  Statement statement = connection.createStatement()) {
+
+                // execute `SHOW PROC '/backends';` to check if backends is 
ready
+                String query = "SHOW PROC '/backends';";
+                try (ResultSet resultSet = statement.executeQuery(query)) {
+                  while (resultSet.next()) {
+                    String alive = resultSet.getString("Alive");
+                    String totalCapacity = 
resultSet.getString("TotalCapacity");
+                    float totalCapacityFloat = 
Float.parseFloat(totalCapacity.split(" ")[0]);
+
+                    // alive should be true and totalCapacity should not be 
0.000
+                    if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 
0.0f) {
+                      LOG.info("StarRocks container startup success!");
+                      return true;
+                    }
+                  }
+                }
+                LOG.info("StarRocks container is not ready yet!");
+              } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+              }
+              return false;
+            });
+
+    return true;
+  }
+
+  public static class Builder
+      extends BaseContainer.Builder<StarRocksContainer.Builder, 
StarRocksContainer> {
+    private Builder() {
+      this.image = DEFAULT_IMAGE;
+      this.hostName = HOST_NAME;
+      this.exposePorts = ImmutableSet.of(FE_HTTP_PORT, FE_MYSQL_PORT);
+    }
+
+    @Override
+    public StarRocksContainer build() {
+      return new StarRocksContainer(
+          image, hostName, exposePorts, extraHosts, filesToMount, envVars, 
network);
+    }
+  }
+}

Reply via email to