This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 789504bce976d03119013b844bc6c61289a7c517
Author: Jarvis <[email protected]>
AuthorDate: Mon Jul 28 19:30:47 2025 +0800

    [#3302][Sub-Task] StarRocks catalog Table/DB ops (#7738)
    
    
    
    ### What changes were proposed in this pull request?
    
    add StarRocks Catalog Implement
    
    ### Why are the changes needed?
    
    To support StarRocks Catalog.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    By E2E test, the test is in another pr
    
    https://github.com/apache/gravitino/pull/7792
---
 .../jdbc/operation/JdbcTableOperations.java        |   6 +-
 .../StarRocksColumnDefaultValueConverter.java      |  69 ++-
 .../converter/StarRocksExceptionConverter.java     |  52 +-
 .../converter/StarRocksTypeConverter.java          | 124 ++++-
 .../operations/StarRocksDatabaseOperations.java    |  63 ++-
 .../operations/StarRocksTableOperations.java       | 522 ++++++++++++++++++++-
 .../catalog/starrocks/utils/StarRocksUtils.java    | 350 ++++++++++++++
 .../starrocks/utils/TestStarRocksUtils.java        | 209 +++++++++
 .../src/test/resources/log4j2.properties           |  73 +++
 9 files changed, 1443 insertions(+), 25 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index 6893a03446..c132780cd8 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -325,7 +325,7 @@ public abstract class JdbcTableOperations implements 
TableOperation {
   protected void purgeTable(String databaseName, String tableName) {
     LOG.info("Attempting to purge table {} from database {}", tableName, 
databaseName);
     try (Connection connection = getConnection(databaseName)) {
-      JdbcConnectorUtils.executeUpdate(connection, 
generatePurgeTableSql(tableName));
+      JdbcConnectorUtils.executeUpdate(connection, 
generatePurgeTableSql(databaseName, tableName));
       LOG.info("Purge table {} from database {}", tableName, databaseName);
     } catch (final SQLException se) {
       throw this.exceptionMapper.toGravitinoException(se);
@@ -483,6 +483,10 @@ public abstract class JdbcTableOperations implements 
TableOperation {
 
   protected abstract String generatePurgeTableSql(String tableName);
 
+  protected String generatePurgeTableSql(String databaseName, String 
tableName) {
+    return generatePurgeTableSql(tableName);
+  }
+
   protected abstract String generateAlterTableSql(
       String databaseName, String tableName, TableChange... changes);
 
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
index a1026e0e44..a9f8c5bd86 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
@@ -18,10 +18,29 @@
  */
 package org.apache.gravitino.catalog.starrocks.converter;
 
-import org.apache.commons.lang3.NotImplementedException;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.BIGINT;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.CHAR;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DATE;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DATETIME;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DECIMAL;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DOUBLE;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.FLOAT;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.INT;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.SMALLINT;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.TINYINT;
+import static 
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.VARCHAR;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import 
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Types;
 
 public class StarRocksColumnDefaultValueConverter extends 
JdbcColumnDefaultValueConverter {
 
@@ -31,6 +50,52 @@ public class StarRocksColumnDefaultValueConverter extends 
JdbcColumnDefaultValue
       String columnDefaultValue,
       boolean isExpression,
       boolean nullable) {
-    throw new NotImplementedException("To be implemented in the future");
+    if (columnDefaultValue == null) {
+      return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+    }
+
+    if (columnDefaultValue.equalsIgnoreCase(NULL)) {
+      return Literals.NULL;
+    }
+
+    if (isExpression) {
+      if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) {
+        return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+      }
+      // The parsing of Doris expressions is complex, so we are not currently 
undertaking the
+      // parsing.
+      return UnparsedExpression.of(columnDefaultValue);
+    }
+
+    switch (columnType.getTypeName().toLowerCase()) {
+      case TINYINT:
+        return Literals.byteLiteral(Byte.valueOf(columnDefaultValue));
+      case SMALLINT:
+        return Literals.shortLiteral(Short.valueOf(columnDefaultValue));
+      case INT:
+        return Literals.integerLiteral(Integer.valueOf(columnDefaultValue));
+      case BIGINT:
+        return Literals.longLiteral(Long.valueOf(columnDefaultValue));
+      case FLOAT:
+        return Literals.floatLiteral(Float.valueOf(columnDefaultValue));
+      case DOUBLE:
+        return Literals.doubleLiteral(Double.valueOf(columnDefaultValue));
+      case DECIMAL:
+        return Literals.decimalLiteral(
+            Decimal.of(columnDefaultValue, columnType.getColumnSize(), 
columnType.getScale()));
+      case DATE:
+        return Literals.dateLiteral(LocalDate.parse(columnDefaultValue, 
DATE_TIME_FORMATTER));
+      case DATETIME:
+        return CURRENT_TIMESTAMP.equals(columnDefaultValue)
+            ? DEFAULT_VALUE_OF_CURRENT_TIMESTAMP
+            : Literals.timestampLiteral(
+                LocalDateTime.parse(columnDefaultValue, DATE_TIME_FORMATTER));
+      case VARCHAR:
+        return Literals.of(columnDefaultValue, 
Types.VarCharType.of(columnType.getColumnSize()));
+      case CHAR:
+        return Literals.of(columnDefaultValue, 
Types.FixedCharType.of(columnType.getColumnSize()));
+      default:
+        throw new IllegalArgumentException("Unknown data columnType for 
literal: " + columnType);
+    }
   }
 }
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
index c5fe992532..889ba3bcf8 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
@@ -18,17 +18,65 @@
  */
 package org.apache.gravitino.catalog.starrocks.converter;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.sql.SQLException;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.exceptions.UnauthorizedException;
 
 /** Exception converter to Apache Gravitino exception for StarRocks. */
 public class StarRocksExceptionConverter extends JdbcExceptionConverter {
 
+  // see https://docs.starrocks.io/docs/3.3/sql-reference/Error_code/
+  @VisibleForTesting static final int CODE_DATABASE_EXISTS = 1007;
+  static final int CODE_TABLE_EXISTS = 1050;
+  static final int CODE_DATABASE_NOT_EXISTS = 1008;
+  static final int CODE_UNKNOWN_DATABASE = 1049;
+  static final int CODE_UNKNOWN_DATABASE_2 = 5501;
+  static final int CODE_NO_SUCH_TABLE = 1051;
+  static final int CODE_NO_SUCH_TABLE_2 = 5502;
+  static final int CODE_UNAUTHORIZED = 1045;
+  static final int CODE_NO_SUCH_COLUMN = 1054;
+  static final int CODE_DELETE_NON_EXISTING_PARTITION = 1507;
+  static final int CODE_PARTITION_ALREADY_EXISTS = 1517;
+
   @SuppressWarnings("FormatStringAnnotation")
   @Override
   public GravitinoRuntimeException toGravitinoException(SQLException se) {
-    throw new GravitinoRuntimeException(
-        String.format("StarRocks exception: %s", se.getMessage()), se);
+    int errorCode = se.getErrorCode();
+    switch (errorCode) {
+      case CODE_DATABASE_EXISTS:
+        return new SchemaAlreadyExistsException(se, se.getMessage());
+      case CODE_TABLE_EXISTS:
+        return new TableAlreadyExistsException(se, se.getMessage());
+      case CODE_DATABASE_NOT_EXISTS:
+      case CODE_UNKNOWN_DATABASE:
+      case CODE_UNKNOWN_DATABASE_2:
+        return new NoSuchSchemaException(se, se.getMessage());
+      case CODE_NO_SUCH_TABLE:
+      case CODE_NO_SUCH_TABLE_2:
+        return new NoSuchTableException(se, se.getMessage());
+      case CODE_UNAUTHORIZED:
+        return new UnauthorizedException(se, se.getMessage());
+      case CODE_NO_SUCH_COLUMN:
+        return new NoSuchColumnException(se, se.getMessage());
+      case CODE_DELETE_NON_EXISTING_PARTITION:
+        return new NoSuchPartitionException(se, se.getMessage());
+      case CODE_PARTITION_ALREADY_EXISTS:
+        return new PartitionAlreadyExistsException(se, se.getMessage());
+      default:
+        if (se.getMessage() != null && se.getMessage().contains("Access 
denied")) {
+          return new ConnectionFailedException(se, se.getMessage());
+        }
+        return new GravitinoRuntimeException(se, se.getMessage());
+    }
   }
 }
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
index 781394348c..1adebd7cf6 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
@@ -18,19 +18,137 @@
  */
 package org.apache.gravitino.catalog.starrocks.converter;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 
 /** Type converter for StarRocks. */
 public class StarRocksTypeConverter extends JdbcTypeConverter {
+
+  static final String BIGINT = "bigint";
+  static final String BOOLEAN = "boolean";
+  static final String DECIMAL = "decimal";
+  static final String DOUBLE = "double";
+  static final String FLOAT = "float";
+  static final String INT = "int";
+  static final String LARGEINT = "largeint";
+  static final String SMALLINT = "smallint";
+  static final String TINYINT = "tinyint";
+
+  static final String BINARY = "binary";
+  static final String VARBINARY = "varbinary";
+  static final String CHAR = "char";
+  static final String STRING = "string";
+  static final String VARCHAR = "varchar";
+
+  static final String DATE = "date";
+  static final String DATETIME = "datetime";
+
+  static final String ARRAY = "array";
+  static final String JSON = "json";
+  static final String MAP = "map";
+  static final String STRUCT = "struct";
+
+  static final String BITMAP = "bitmap";
+  static final String HLL = "hll";
+
+  static final String BIT = "BIT";
+
   @Override
   public Type toGravitino(JdbcTypeBean typeBean) {
-    throw new NotImplementedException("To be implemented in the future");
+    switch (typeBean.getTypeName().toLowerCase()) {
+      case BIGINT:
+        return Types.LongType.get();
+      case BOOLEAN:
+        return Types.BooleanType.get();
+      case DECIMAL:
+        return Types.DecimalType.of(typeBean.getColumnSize(), 
typeBean.getScale());
+      case DOUBLE:
+        return Types.DoubleType.get();
+      case FLOAT:
+        return Types.FloatType.get();
+      case INT:
+        return Types.IntegerType.get();
+      case SMALLINT:
+        return Types.ShortType.get();
+      case TINYINT:
+        return Types.ByteType.get();
+      case BINARY:
+      case VARBINARY:
+        return Types.BinaryType.get();
+      case CHAR:
+        return Types.FixedCharType.of(typeBean.getColumnSize());
+      case STRING:
+        return Types.StringType.get();
+      case VARCHAR:
+        if (typeBean.getColumnSize() == 65533) {
+          return Types.StringType.get();
+        }
+        return Types.VarCharType.of(typeBean.getColumnSize());
+      case DATE:
+        return Types.DateType.get();
+      case DATETIME:
+        return Types.TimestampType.withoutTimeZone();
+      default:
+        if (typeBean.getTypeName().equals("BIT")
+            && typeBean.getColumnSize() == 1
+            && typeBean.getScale() == 0) {
+          return Types.BooleanType.get();
+        }
+        return Types.ExternalType.of(typeBean.getTypeName());
+    }
   }
 
   @Override
   public String fromGravitino(Type type) {
-    throw new NotImplementedException("To be implemented in the future");
+    if (type instanceof Types.LongType) {
+      return BIGINT;
+    } else if (type instanceof Types.BooleanType) {
+      return BOOLEAN;
+    } else if (type instanceof Types.DecimalType) {
+      return DECIMAL
+          + "("
+          + ((Types.DecimalType) type).precision()
+          + ","
+          + ((Types.DecimalType) type).scale()
+          + ")";
+    } else if (type instanceof Types.DoubleType) {
+      return DOUBLE;
+    } else if (type instanceof Types.FloatType) {
+      return FLOAT;
+    } else if (type instanceof Types.IntegerType) {
+      return INT;
+    } else if (type instanceof Types.ShortType) {
+      return SMALLINT;
+    } else if (type instanceof Types.ByteType) {
+      return TINYINT;
+    } else if (type instanceof Types.BinaryType) {
+      return BINARY;
+    } else if (type instanceof Types.FixedCharType) {
+      int length = ((Types.FixedCharType) type).length();
+      if (length < 1 || length > 255) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Type %s is invalid, length should be between 1 and 255", 
type.simpleString()));
+      }
+
+      return CHAR + "(" + ((Types.FixedCharType) type).length() + ")";
+    } else if (type instanceof Types.StringType) {
+      return STRING;
+    } else if (type instanceof Types.VarCharType) {
+      int length = ((Types.VarCharType) type).length();
+      if (length < 1 || length > 1048576) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Type %s is invalid, length should be between 1 and 1048576", 
type.simpleString()));
+      }
+      return VARCHAR + "(" + ((Types.VarCharType) type).length() + ")";
+    } else if (type instanceof Types.DateType) {
+      return DATE;
+    } else if (type instanceof Types.TimestampType) {
+      return DATETIME;
+    }
+    throw new IllegalArgumentException(
+        String.format("Couldn't convert Gravitino type %s to StarRocks type", 
type.simpleString()));
   }
 }
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
index 85607ed33c..4e7eda3b63 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
@@ -19,34 +19,87 @@
 package org.apache.gravitino.catalog.starrocks.operations;
 
 import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.gravitino.catalog.jdbc.JdbcSchema;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Database operations for StarRocks. */
 public class StarRocksDatabaseOperations extends JdbcDatabaseOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksDatabaseOperations.class);
+
   @Override
   public String generateCreateDatabaseSql(
       String databaseName, String comment, Map<String, String> properties) {
-    throw new NotImplementedException("To be implemented in the future");
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE DATABASE `%s`", databaseName));
+
+    // Append properties
+    sqlBuilder.append("\n");
+    sqlBuilder.append(StarRocksUtils.generatePropertiesSql(properties));
+
+    String ddl = sqlBuilder.toString();
+    LOG.info("Generated create database:{} sql: {}", databaseName, ddl);
+    return ddl;
   }
 
   @Override
   public String generateDropDatabaseSql(String databaseName, boolean cascade) {
-    throw new NotImplementedException("To be implemented in the future");
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("DROP DATABASE `%s`", databaseName));
+    if (cascade) {
+      sqlBuilder.append(" FORCE");
+      return sqlBuilder.toString();
+    }
+    String query = String.format("SHOW TABLES IN `%s`", databaseName);
+    try (final Connection connection = this.dataSource.getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet resultSet = statement.executeQuery(query)) {
+      if (resultSet.next()) {
+        throw new IllegalStateException(
+            String.format(
+                "Database %s is not empty, the value of cascade should be 
true.", databaseName));
+      }
+    } catch (SQLException sqlException) {
+      throw this.exceptionMapper.toGravitinoException(sqlException);
+    }
+    return sqlBuilder.toString();
   }
 
   @Override
   public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
-    throw new NotImplementedException("To be implemented in the future");
+    List<String> allDatabases = listDatabases();
+    String dbName =
+        allDatabases.stream()
+            .filter(db -> db.equals(databaseName))
+            .findFirst()
+            .orElseThrow(
+                () -> new NoSuchSchemaException("Database %s could not be 
found", databaseName));
+    // StarRocks support set properties, but can't get them after setting
+    // 
https://docs.starrocks.io/docs/3.3/sql-reference/sql-statements/Database/SHOW_CREATE_DATABASE/
+    return JdbcSchema.builder()
+        .withName(dbName)
+        .withComment("")
+        .withProperties(Collections.emptyMap())
+        .withAuditInfo(AuditInfo.EMPTY)
+        .build();
   }
 
   @Override
   protected boolean supportSchemaComment() {
-    return true;
+    return false;
   }
 
   @Override
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
index 11ae78b758..87b3d88279 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
@@ -18,24 +18,69 @@
  */
 package org.apache.gravitino.catalog.starrocks.operations;
 
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.NotImplementedException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
 import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.RangePartition;
 
 /** Table operations for StarRocks. */
 public class StarRocksTableOperations extends JdbcTableOperations {
 
+  private static final String BACK_QUOTE = "`";
+  private static final String AUTO_INCREMENT = "AUTO_INCREMENT";
+  private static final String NEW_LINE = "\n";
+
+  private static final Set<String> SUPPORTED_MODIFY_PROPERTIES =
+      new HashSet<>(
+          Arrays.asList(
+              "replication_num",
+              "default.replication_num",
+              "default.storage_medium",
+              "enable_persistent_index",
+              "bloom_filter_columns",
+              "colocate_with",
+              "bucket_size",
+              "base_compaction_forbidden_time_ranges"));
+  private static final String 
SUPPORTED_MODIFY_PROPERTIES_PREFIX_DYNAMIC_PARTITION =
+      "dynamic_partition.";
+  private static final String SUPPORTED_MODIFY_PROPERTIES_PREFIX_BINLOG = 
"binlog.";
+
   @Override
   public JdbcTablePartitionOperations 
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
     return new StarRocksTablePartitionOperations(
@@ -51,59 +96,512 @@ public class StarRocksTableOperations extends 
JdbcTableOperations {
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new NotImplementedException("To be implemented in the future");
+
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE TABLE `%s` ( \n", tableName));
+    // Add columns
+    sqlBuilder.append(
+        Arrays.stream(columns)
+            .map(
+                column -> {
+                  StringBuilder columnsSql = new StringBuilder();
+                  columnsSql
+                      .append(SPACE)
+                      .append(BACK_QUOTE)
+                      .append(column.name())
+                      .append(BACK_QUOTE);
+                  appendColumnDefinition(column, columnsSql);
+                  return columnsSql.toString();
+                })
+            .collect(Collectors.joining(",\n")));
+    sqlBuilder.append(")\n");
+    if (StringUtils.isNotEmpty(comment)) {
+      comment = StringIdentifier.addToComment(StringIdentifier.DUMMY_ID, 
comment);
+      sqlBuilder.append(" COMMENT \"").append(comment).append("\"");
+    }
+
+    appendPartitionSql(partitioning, columns, sqlBuilder);
+    addDistributionSql(distribution, sqlBuilder);
+    addPropertiesSql(properties, sqlBuilder);
+
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @Override
+  protected String generateAlterTableSql(
+      String databaseName, String tableName, TableChange... changes) {
+    JdbcTable lazyLoadTable = null;
+    List<String> alterSql = new ArrayList<>();
+    boolean hasSetPropertyChange = false;
+    for (int i = 0; i < changes.length; i++) {
+      TableChange change = changes[i];
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addColumnFieldDefinition(addColumn));
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(deleteColumnFieldDefinition(deleteColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.RemoveProperty) {
+        throw new IllegalArgumentException("Remove property is not supported 
yet.");
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameColumnDefinition(renameColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.RenameTable) {
+        TableChange.RenameTable renameTable = (TableChange.RenameTable) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(renameTableDefinition(renameTable, lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        TableChange.UpdateColumnPosition updateColumnPosition =
+            (TableChange.UpdateColumnPosition) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) change;
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, 
lazyLoadTable));
+      } else if (change instanceof TableChange.UpdateComment) {
+        TableChange.UpdateComment updateComment = (TableChange.UpdateComment) 
change;
+        String newComment = updateComment.getNewComment();
+        alterSql.add("MODIFY COMMENT \"" + newComment + "\"");
+      } else if (change instanceof TableChange.SetProperty) {
+        if (hasSetPropertyChange) {
+          throw new IllegalArgumentException(
+              "StarRocks suggest modify one property at a time, please split 
it to multiple request.");
+        }
+        TableChange.SetProperty setProperty = (TableChange.SetProperty) change;
+        if (!SUPPORTED_MODIFY_PROPERTIES.contains(setProperty.getProperty())
+            && setProperty
+                .getProperty()
+                
.startsWith(SUPPORTED_MODIFY_PROPERTIES_PREFIX_DYNAMIC_PARTITION)
+            && 
setProperty.getProperty().startsWith(SUPPORTED_MODIFY_PROPERTIES_PREFIX_BINLOG))
 {
+          throw new IllegalArgumentException(
+              "Current StarRocks not support modify this table property "
+                  + setProperty.getProperty());
+        }
+        alterSql.add(generateTableProperties(setProperty));
+        hasSetPropertyChange = true;
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type : " + change.getClass().getName());
+      }
+    }
+
+    String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", 
alterSql) + ";";
+    LOG.info("Generated alter table:{}.{} sql: {}", databaseName, tableName, 
result);
+    return result;
   }
 
   @Override
   protected boolean getAutoIncrementInfo(ResultSet resultSet) throws 
SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+    return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
   }
 
   @Override
   protected Map<String, String> getTableProperties(Connection connection, 
String tableName)
       throws SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+
+    String showCreateTableSQL = String.format("SHOW CREATE TABLE `%s`", 
tableName);
+
+    StringBuilder createTableSqlSb = new StringBuilder();
+    try (Statement statement = connection.createStatement();
+        ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
+      while (resultSet.next()) {
+        createTableSqlSb.append(resultSet.getString("Create Table"));
+      }
+    }
+
+    String createTableSql = createTableSqlSb.toString();
+
+    if (StringUtils.isEmpty(createTableSql)) {
+      throw new NoSuchTableException(
+          "Table %s does not exist in %s.", tableName, 
connection.getCatalog());
+    }
+
+    return 
Collections.unmodifiableMap(StarRocksUtils.extractPropertiesFromSql(createTableSql));
   }
 
   @Override
   protected List<Index> getIndexes(Connection connection, String databaseName, 
String tableName)
       throws SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+    String sql = String.format("SHOW INDEX FROM `%s` FROM `%s`", tableName, 
databaseName);
+
+    // get Indexes from SQL
+    try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+        ResultSet resultSet = preparedStatement.executeQuery()) {
+
+      List<Index> indexes = new ArrayList<>();
+      while (resultSet.next()) {
+        String indexName = resultSet.getString("Key_name");
+        String columnName = resultSet.getString("Column_name");
+        indexes.add(
+            Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][] 
{{columnName}}));
+      }
+      return indexes;
+    } catch (SQLException e) {
+      throw exceptionMapper.toGravitinoException(e);
+    }
   }
 
   @Override
   protected Transform[] getTablePartitioning(
       Connection connection, String databaseName, String tableName) throws 
SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+    String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`", 
tableName);
+    try (Statement statement = connection.createStatement();
+        ResultSet result = statement.executeQuery(showCreateTableSql)) {
+      StringBuilder createTableSql = new StringBuilder();
+      if (result.next()) {
+        createTableSql.append(result.getString("Create Table"));
+      }
+      Optional<Transform> transform =
+          
StarRocksUtils.extractPartitionInfoFromSql(createTableSql.toString());
+      return transform.map(t -> new Transform[] 
{t}).orElse(Transforms.EMPTY_TRANSFORM);
+    }
   }
 
   @Override
   protected void correctJdbcTableFields(
       Connection connection, String databaseName, String tableName, 
JdbcTable.Builder tableBuilder)
       throws SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+    String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`", 
tableName);
+    try (Statement statement = connection.createStatement();
+        ResultSet result = statement.executeQuery(showCreateTableSql)) {
+      StringBuilder createTableSql = new StringBuilder();
+      if (result.next()) {
+        createTableSql.append(result.getString("Create Table"));
+      }
+      String tableComment = 
StarRocksUtils.extractTableCommentFromSql(createTableSql.toString());
+      tableBuilder.withComment(tableComment);
+    }
   }
 
   @Override
   protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
-    throw new NotImplementedException("To be implemented in the future");
+    return String.format("ALTER TABLE `%s` RENAME `%s`", oldTableName, 
newTableName);
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
+    // never called, as implemented generatePurgeTableSql(String databaseName, 
String tableName)
     throw new UnsupportedOperationException(
         "StarRocks does not support purge table in Gravitino, please use drop 
table");
   }
 
   @Override
-  protected String generateAlterTableSql(
-      String databaseName, String tableName, TableChange... changes) {
-    throw new NotImplementedException("To be implemented in the future");
+  protected String generatePurgeTableSql(String databaseName, String 
tableName) {
+    return String.format("TRUNCATE TABLE `%s`.`%s`", databaseName, tableName);
   }
 
   @Override
   protected Distribution getDistributionInfo(
       Connection connection, String databaseName, String tableName) throws 
SQLException {
-    throw new NotImplementedException("To be implemented in the future");
+
+    String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`", 
tableName);
+    try (Statement statement = connection.createStatement();
+        ResultSet result = statement.executeQuery(showCreateTableSql)) {
+      result.next();
+      String createTableSyntax = result.getString("Create Table");
+      return StarRocksUtils.extractDistributionInfoFromSql(createTableSyntax);
+    }
+  }
+
+  public StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+
+    // Add DEFAULT value if specified
+    if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) {
+      sqlBuilder
+          .append("DEFAULT ")
+          
.append(columnDefaultValueConverter.fromGravitino(column.defaultValue()))
+          .append(SPACE);
+    }
+
+    // Add column auto_increment if specified
+    if (column.autoIncrement()) {
+      sqlBuilder.append(AUTO_INCREMENT).append(" ");
+    }
+
+    // Add column comment if specified
+    if (StringUtils.isNotEmpty(column.comment())) {
+      sqlBuilder.append("COMMENT '").append(column.comment()).append("' ");
+    }
+    return sqlBuilder;
+  }
+
+  private static void appendPartitionSql(
+      Transform[] partitioning, JdbcColumn[] columns, StringBuilder 
sqlBuilder) {
+    if (ArrayUtils.isEmpty(partitioning)) {
+      return;
+    }
+    Preconditions.checkArgument(
+        partitioning.length == 1, "Composite partition type is not supported");
+
+    StringBuilder partitionSqlBuilder;
+    Set<String> columnNames =
+        
Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet());
+
+    if (partitioning[0] instanceof Transforms.RangeTransform) {
+      // We do not support multi-column range partitioning in StarRocks for now
+      Transforms.RangeTransform rangePartition = (Transforms.RangeTransform) 
partitioning[0];
+      partitionSqlBuilder = generateRangePartitionSql(rangePartition, 
columnNames);
+    } else if (partitioning[0] instanceof Transforms.ListTransform) {
+      Transforms.ListTransform listPartition = (Transforms.ListTransform) 
partitioning[0];
+      partitionSqlBuilder = generateListPartitionSql(listPartition, 
columnNames);
+    } else {
+      throw new IllegalArgumentException("Unsupported partition type of 
StarRocks");
+    }
+
+    sqlBuilder.append(partitionSqlBuilder);
+  }
+
+  private static StringBuilder generateRangePartitionSql(
+      Transforms.RangeTransform rangePartition, Set<String> columnNames) {
+    Preconditions.checkArgument(
+        rangePartition.fieldName().length == 1,
+        "StarRocks partition does not support nested field");
+    Preconditions.checkArgument(
+        columnNames.contains(rangePartition.fieldName()[0]),
+        "The partition field must be one of the columns");
+
+    StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
+    String partitionDefinition =
+        String.format(" PARTITION BY RANGE(`%s`)", 
rangePartition.fieldName()[0]);
+    
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");
+
+    // Assign range partitions
+    RangePartition[] assignments = rangePartition.assignments();
+    if (!ArrayUtils.isEmpty(assignments)) {
+      String partitionSqlFragments =
+          Arrays.stream(assignments)
+              .map(StarRocksUtils::generatePartitionSqlFragment)
+              .collect(Collectors.joining("," + NEW_LINE));
+      partitionSqlBuilder.append(NEW_LINE).append(partitionSqlFragments);
+    }
+
+    partitionSqlBuilder.append(NEW_LINE).append(")");
+    return partitionSqlBuilder;
+  }
+
+  private static StringBuilder generateListPartitionSql(
+      Transforms.ListTransform listPartition, Set<String> columnNames) {
+    ImmutableList.Builder<String> partitionColumnsBuilder = 
ImmutableList.builder();
+    String[][] filedNames = listPartition.fieldNames();
+    for (String[] filedName : filedNames) {
+      Preconditions.checkArgument(
+          filedName.length == 1, "StarRocks partition does not support nested 
field");
+      Preconditions.checkArgument(
+          columnNames.contains(filedName[0]), "The partition field must be one 
of the columns");
+
+      partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
+    }
+    String partitionColumns =
+        
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));
+
+    StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
+    String partitionDefinition = String.format(" PARTITION BY LIST(%s)", 
partitionColumns);
+    
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");
+
+    // Assign list partitions
+    ListPartition[] assignments = listPartition.assignments();
+    if (!ArrayUtils.isEmpty(assignments)) {
+      ImmutableList.Builder<String> partitions = ImmutableList.builder();
+      for (ListPartition part : assignments) {
+        Literal<?>[][] lists = part.lists();
+        Preconditions.checkArgument(
+            lists.length > 0, "The number of values in list partition must be 
greater than 0");
+        Preconditions.checkArgument(
+            Arrays.stream(lists).allMatch(p -> p.length == filedNames.length),
+            "The number of partitioning columns must be consistent");
+
+        partitions.add(StarRocksUtils.generatePartitionSqlFragment(part));
+      }
+      partitionSqlBuilder
+          .append(NEW_LINE)
+          .append(partitions.build().stream().collect(Collectors.joining("," + 
NEW_LINE)));
+    }
+
+    partitionSqlBuilder.append(NEW_LINE).append(")");
+    return partitionSqlBuilder;
+  }
+
+  private static void addDistributionSql(Distribution distribution, 
StringBuilder sqlBuilder) {
+    if (distribution == null || distribution.strategy() == Strategy.NONE) {
+      return;
+    }
+    if (distribution.strategy() == Strategy.HASH) {
+      sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH(");
+      sqlBuilder.append(
+          Arrays.stream(distribution.expressions())
+              .map(column -> BACK_QUOTE + column.toString() + BACK_QUOTE)
+              .collect(Collectors.joining(", ")));
+      sqlBuilder.append(")");
+    } else if (distribution.strategy() == Strategy.EVEN) {
+      sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY ").append("RANDOM");
+    }
+    if (distribution.number() != -1) {
+      sqlBuilder
+          .append(" BUCKETS ")
+          .append(StarRocksUtils.toBucketNumberString(distribution.number()));
+    }
+  }
+
+  private static void addPropertiesSql(Map<String, String> properties, 
StringBuilder sqlBuilder) {
+    if (properties == null || properties.isEmpty()) {
+      return;
+    }
+    
sqlBuilder.append("\n").append(StarRocksUtils.generatePropertiesSql(properties));
+  }
+
+  private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+    String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+    if (addColumn.fieldName().length > 1) {
+      throw new UnsupportedOperationException("StarRocks does not support 
nested column names.");
+    }
+    String col = addColumn.fieldName()[0];
+
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition
+        .append("ADD COLUMN ")
+        .append(BACK_QUOTE)
+        .append(col)
+        .append(BACK_QUOTE)
+        .append(SPACE)
+        .append(dataType)
+        .append(SPACE);
+
+    // Append comment if available
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      columnDefinition.append("COMMENT 
'").append(addColumn.getComment()).append("' ");
+    }
+
+    // Append position if available
+    if (addColumn.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (addColumn.getPosition() instanceof TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
addColumn.getPosition();
+      columnDefinition
+          .append("AFTER ")
+          .append(BACK_QUOTE)
+          .append(afterPosition.getColumn())
+          .append(BACK_QUOTE);
+    } else if (addColumn.getPosition() instanceof TableChange.Default) {
+      // do nothing
+    } else {
+      throw new IllegalArgumentException("Invalid column position.");
+    }
+    return columnDefinition.toString();
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new UnsupportedOperationException("StarRocks does not support 
nested column names.");
+    }
+    String col = deleteColumn.fieldName()[0];
+    try {
+      getJdbcColumnFromTable(jdbcTable, col);
+    } catch (NoSuchColumnException ex) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column does not exist: " + 
col);
+      }
+    }
+    return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE;
+  }
+
+  private String renameColumnDefinition(
+      TableChange.RenameColumn renameColumn, JdbcTable jdbcTable) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new UnsupportedOperationException("StarRocks does not support 
nested column names.");
+    }
+    String oldColName = renameColumn.fieldName()[0];
+    try {
+      getJdbcColumnFromTable(jdbcTable, oldColName);
+    } catch (NoSuchColumnException ex) {
+      throw new IllegalArgumentException("Original column does not exist: " + 
oldColName);
+    }
+    try {
+      getJdbcColumnFromTable(jdbcTable, renameColumn.getNewName());
+    } catch (NoSuchColumnException ex) {
+      return String.format("RENAME COLUMN %s TO %s", oldColName, 
renameColumn.getNewName());
+    }
+    throw new IllegalArgumentException("Column already exists: " + 
renameColumn.getNewName());
+  }
+
+  private String renameTableDefinition(TableChange.RenameTable renameTable, 
JdbcTable jdbcTable) {
+    try {
+      load(jdbcTable.databaseName(), renameTable.getNewName());
+    } catch (NoSuchTableException ex) {
+      return "RENAME " + renameTable.getNewName();
+    }
+    throw new IllegalArgumentException("Table already exists: " + 
renameTable.getNewName());
+  }
+
+  private String generateTableProperties(TableChange.SetProperty setProperty) {
+    return String.format(
+        "set ( \"%s\" = \"%s\" )", setProperty.getProperty(), 
setProperty.getValue());
+  }
+
+  private String updateColumnPositionFieldDefinition(
+      TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable 
jdbcTable) {
+    if (updateColumnPosition.fieldName().length > 1) {
+      throw new UnsupportedOperationException("StarRocks does not support 
nested column names.");
+    }
+    String col = updateColumnPosition.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder columnDefinition = new StringBuilder();
+    columnDefinition.append("MODIFY COLUMN 
").append(BACK_QUOTE).append(col).append(BACK_QUOTE);
+    appendColumnDefinition(column, columnDefinition);
+    if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+      columnDefinition.append("FIRST");
+    } else if (updateColumnPosition.getPosition() instanceof 
TableChange.After) {
+      TableChange.After afterPosition = (TableChange.After) 
updateColumnPosition.getPosition();
+      columnDefinition
+          .append("AFTER ")
+          .append(BACK_QUOTE)
+          .append(afterPosition.getColumn())
+          .append(BACK_QUOTE);
+    } else {
+      Arrays.stream(jdbcTable.columns())
+          .reduce((column1, column2) -> column2)
+          .map(Column::name)
+          .ifPresent(s -> columnDefinition.append("AFTER ").append(s));
+    }
+    return columnDefinition.toString();
+  }
+
+  private String updateColumnTypeFieldDefinition(
+      TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) {
+    if (updateColumnType.fieldName().length > 1) {
+      throw new UnsupportedOperationException("StarRocks does not support 
nested column names.");
+    }
+    String col = updateColumnType.fieldName()[0];
+    JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+    StringBuilder sqlBuilder = new StringBuilder("MODIFY COLUMN " + BACK_QUOTE 
+ col + BACK_QUOTE);
+    JdbcColumn newColumn =
+        JdbcColumn.builder()
+            .withName(col)
+            .withType(updateColumnType.getNewDataType())
+            .withComment(column.comment())
+            .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+            .withNullable(column.nullable())
+            .withAutoIncrement(column.autoIncrement())
+            .build();
+    return appendColumnDefinition(newColumn, sqlBuilder).toString();
   }
 }
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
new file mode 100644
index 0000000000..a235562cd9
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksUtils {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksUtils.class);
+
+  private static final Pattern PARTITION_INFO_PATTERN =
+      Pattern.compile("PARTITION BY \\b(LIST|RANGE)\\b\\((.+)\\)\\s*\\(?");
+
+  private static final Pattern DISTRIBUTION_INFO_PATTERN =
+      Pattern.compile(
+          "DISTRIBUTED 
BY\\s+(HASH|RANDOM)\\s*(\\(([^)]+)\\))?\\s*(BUCKETS\\s+(\\d+))?");
+
+  private static final Pattern TABLE_COMMENT_PATTERN =
+      Pattern.compile("COMMENT\\s*\"([^\\(]+?)\\s*\\(From Gravitino,.*\\)\"");
+
+  private static final String PARTITION_TYPE_VALUE_PATTERN_STRING =
+      "types: \\[([^\\]]+)\\]; keys: \\[([^\\]]+)\\];";
+  private static final Pattern PARTITION_TYPE_VALUE_PATTERN =
+      Pattern.compile(PARTITION_TYPE_VALUE_PATTERN_STRING);
+
+  private static final Pattern PARTITION_LIST_PATTERN = 
Pattern.compile("\\[([^\\[\\]]+)\\]");
+
+  private static final Pattern PARTITION_LIST_PATTERN2 = 
Pattern.compile("\\(([^()]+)\\)");
+
+  private static final String LIST_PARTITION = "LIST";
+  private static final String RANGE_PARTITION = "RANGE";
+  public static final String ID = "PartitionId";
+  public static final String NAME = "PartitionName";
+  public static final String KEY = "PartitionKey";
+  public static final String VALUES_LIST = "List";
+  public static final String VALUES_RANGE = "Range";
+  public static final String VISIBLE_VERSION = "VisibleVersion";
+  public static final String VISIBLE_VERSION_TIME = "VisibleVersionTime";
+  public static final String STATE = "State";
+  public static final String DATA_SIZE = "DataSize";
+  public static final String IS_IN_MEMORY = "IsInMemory";
+
+  // convert Map<String, String> properties to SQL String
+  public static String generatePropertiesSql(Map<String, String> properties) {
+    if (properties == null || properties.isEmpty()) {
+      return "";
+    }
+    StringBuilder sqlBuilder = new StringBuilder(" PROPERTIES (\n");
+    sqlBuilder.append(
+        properties.entrySet().stream()
+            .map(entry -> "\"" + entry.getKey() + "\"=\"" + entry.getValue() + 
"\"")
+            .collect(Collectors.joining(",\n")));
+    sqlBuilder.append("\n)");
+    return sqlBuilder.toString();
+  }
+
+  public static Map<String, String> extractPropertiesFromSql(String 
createTableSql) {
+    Map<String, String> properties = new HashMap<>();
+    String[] lines = createTableSql.split("\n");
+
+    boolean isProperties = false;
+    final String sProperties = "\"(.*)\"\\s*=\\s*\"(.*)\",?";
+    final Pattern patternProperties = Pattern.compile(sProperties);
+
+    for (String line : lines) {
+      if (line.contains("PROPERTIES")) {
+        isProperties = true;
+      }
+
+      if (isProperties) {
+        final Matcher matcherProperties = patternProperties.matcher(line);
+        if (matcherProperties.find()) {
+          final String key = matcherProperties.group(1).trim();
+          String value = matcherProperties.group(2).trim();
+          properties.put(key, value);
+        }
+      }
+    }
+    return properties;
+  }
+
+  public static Optional<Transform> extractPartitionInfoFromSql(String 
createTableSql) {
+    try {
+      String[] lines = createTableSql.split("\n");
+      for (String line : lines) {
+        Matcher matcher = PARTITION_INFO_PATTERN.matcher(line.trim());
+        if (matcher.matches()) {
+          String partitionType = matcher.group(1);
+          String partitionInfoString = matcher.group(2);
+          String[] columns =
+              Arrays.stream(partitionInfoString.split(","))
+                  .map(String::trim)
+                  .map(s -> s.replace("`", ""))
+                  .toArray(String[]::new);
+          if (LIST_PARTITION.equals(partitionType)) {
+            String[][] filedNames =
+                Arrays.stream(columns).map(s -> new String[] 
{s}).toArray(String[][]::new);
+            return Optional.of(Transforms.list(filedNames));
+          } else if (RANGE_PARTITION.equals(partitionType)) {
+            return Optional.of(Transforms.range(new String[] {columns[0]}));
+          }
+        }
+      }
+      return Optional.empty();
+    } catch (Exception e) {
+      LOGGER.warn("Failed to extract partition info", e);
+      return Optional.empty();
+    }
+  }
+
+  public static Distribution extractDistributionInfoFromSql(String 
createTableSql) {
+    Matcher matcher = DISTRIBUTION_INFO_PATTERN.matcher(createTableSql.trim());
+    if (matcher.find()) {
+      String distributionType = matcher.group(1);
+
+      // For Random distribution, no need to specify distribution columns.
+      String distributionColumns = matcher.group(3);
+      String[] columns =
+          Objects.equals(distributionColumns, null)
+              ? new String[] {}
+              : Arrays.stream(distributionColumns.split(","))
+                  .map(String::trim)
+                  .map(f -> f.substring(1, f.length() - 1))
+                  .toArray(String[]::new);
+
+      int bucketNum = extractBucketNum(matcher);
+
+      return new Distributions.DistributionImpl.Builder()
+          .withStrategy(Strategy.getByName(distributionType))
+          .withNumber(bucketNum)
+          .withExpressions(
+              Arrays.stream(columns)
+                  .map(col -> NamedReference.field(new String[] {col}))
+                  .toArray(NamedReference[]::new))
+          .build();
+    }
+
+    throw new RuntimeException("Failed to extract distribution info in sql:" + 
createTableSql);
+  }
+
+  public static String extractTableCommentFromSql(String createTableSql) {
+    Matcher matcher = TABLE_COMMENT_PATTERN.matcher(createTableSql.trim());
+    if (matcher.find()) {
+      return matcher.group(1);
+    }
+    return "";
+  }
+
+  /**
+   * Generate sql fragment that create partition in StarRocks.
+   *
+   * <p>The sql fragment looks like "PARTITION {partitionName} VALUES 
{values}", for example:
+   *
+   * <pre>PARTITION `p20240724` VALUES LESS THAN ("2024-07-24")</pre>
+   *
+   * <pre>PARTITION `p20240724_v1` VALUES IN ("2024-07-24", "v1")</pre>
+   *
+   * @param partition The partition to be created.
+   * @return The partition sql fragment.
+   */
+  public static String generatePartitionSqlFragment(Partition partition) {
+    String partitionSqlFragment = "PARTITION `%s` VALUES %s";
+    if (partition instanceof RangePartition) {
+      return String.format(
+          partitionSqlFragment,
+          partition.name(),
+          generateRangePartitionValues((RangePartition) partition));
+    } else if (partition instanceof ListPartition) {
+      return String.format(
+          partitionSqlFragment,
+          partition.name(),
+          generateListPartitionSqlValues((ListPartition) partition));
+    } else {
+      throw new IllegalArgumentException("Unsupported partition type of 
StarRocks");
+    }
+  }
+
+  private static String generateRangePartitionValues(RangePartition 
rangePartition) {
+    Literal<?> upper = rangePartition.upper();
+    String partitionValues;
+    if (Literals.NULL.equals(upper)) {
+      partitionValues = "LESS THAN MAXVALUE";
+    } else {
+      partitionValues = String.format("LESS THAN (\"%s\")", upper.value());
+    }
+    return partitionValues;
+  }
+
+  private static String generateListPartitionSqlValues(ListPartition 
listPartition) {
+    Literal<?>[][] lists = listPartition.lists();
+    ImmutableList.Builder<String> listValues = ImmutableList.builder();
+    for (Literal<?>[] part : lists) {
+      String values;
+      if (part.length > 1) {
+        values =
+            String.format(
+                "(%s)",
+                Arrays.stream(part)
+                    .map(p -> "\"" + p.value() + "\"")
+                    .collect(Collectors.joining(",")));
+      } else {
+        values = String.format("\"%s\"", part[0].value());
+      }
+      listValues.add(values);
+    }
+    return String.format("IN (%s)", 
listValues.build().stream().collect(Collectors.joining(",")));
+  }
+
+  private static int extractBucketNum(Matcher matcher) {
+    int bucketNum = -1;
+    if (matcher.find(5)) {
+      String bucketValue = matcher.group(5);
+      if (bucketValue == null) {
+        return bucketNum;
+      }
+      // Use -1 to indicate auto bucket.
+      bucketNum = Integer.parseInt(bucketValue);
+    }
+    return bucketNum;
+  }
+
+  public static String toBucketNumberString(int number) {
+    return String.valueOf(number);
+  }
+
+  public static Partition fromStarRocksPartition(
+      String tableName, ResultSet resultSet, Transform partitionInfo, 
Map<String, Type> columnTypes)
+      throws SQLException {
+    String partitionName = resultSet.getString(NAME);
+    String partitionKey = resultSet.getString(KEY);
+    String partitionValues;
+    if (partitionInfo instanceof Transforms.RangeTransform) {
+      partitionValues = resultSet.getString(VALUES_RANGE);
+    } else if (partitionInfo instanceof Transforms.ListTransform) {
+      partitionValues = resultSet.getString(VALUES_LIST);
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not a partitioned table", tableName));
+    }
+    ImmutableMap.Builder<String, String> propertiesBuilder = 
ImmutableMap.builder();
+    propertiesBuilder.put(ID, resultSet.getString(ID));
+    propertiesBuilder.put(VISIBLE_VERSION, 
resultSet.getString(VISIBLE_VERSION));
+    propertiesBuilder.put(VISIBLE_VERSION_TIME, 
resultSet.getString(VISIBLE_VERSION_TIME));
+    propertiesBuilder.put(STATE, resultSet.getString(STATE));
+    propertiesBuilder.put(KEY, partitionKey);
+    propertiesBuilder.put(DATA_SIZE, resultSet.getString(DATA_SIZE));
+    propertiesBuilder.put(IS_IN_MEMORY, resultSet.getString(IS_IN_MEMORY));
+    ImmutableMap<String, String> properties = propertiesBuilder.build();
+
+    String[] partitionKeys = partitionKey.split(",");
+    if (partitionInfo instanceof Transforms.RangeTransform) {
+      if (partitionKeys.length != 1) {
+        throw new UnsupportedOperationException(
+            "Multi-column range partitioning in StarRocks is not supported 
yet");
+      }
+      Type partitionColumnType = columnTypes.get(partitionKeys[0].trim());
+      Literal<?> lower = Literals.NULL;
+      Literal<?> upper = Literals.NULL;
+      Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+      if (matcher.find()) {
+        String lowerValue = matcher.group(2);
+        lower = Literals.of(lowerValue, partitionColumnType);
+        if (matcher.find()) {
+          String upperValue = matcher.group(2);
+          upper = Literals.of(upperValue, partitionColumnType);
+        }
+      }
+      return Partitions.range(partitionName, upper, lower, properties);
+    } else if (partitionInfo instanceof Transforms.ListTransform) {
+      ImmutableList.Builder<Literal<?>[]> lists = ImmutableList.builder();
+      partitionValues = partitionValues.trim();
+      if (partitionValues.startsWith("(") && partitionValues.endsWith(")")) {
+        Matcher matcher = PARTITION_LIST_PATTERN2.matcher(partitionValues);
+        while (matcher.find()) {
+          String[] values = matcher.group(1).split(",");
+          ImmutableList.Builder<Literal<?>> literValues = 
ImmutableList.builder();
+          for (int i = 0; i < values.length; i++) {
+            Type partitionColumnType = 
columnTypes.get(partitionKeys[i].trim());
+            literValues.add(
+                Literals.of(
+                    values[i].trim().replace("\"", "").replace("'", ""), 
partitionColumnType));
+          }
+          lists.add(literValues.build().toArray(new Literal<?>[0]));
+        }
+        return Partitions.list(
+            partitionName, lists.build().toArray(new Literal<?>[0][0]), 
properties);
+      } else if (partitionValues.startsWith("[") && 
partitionValues.endsWith("]")) {
+        Matcher matcher = PARTITION_LIST_PATTERN.matcher(partitionValues);
+        while (matcher.find()) {
+          String[] values = matcher.group(1).split(",");
+          ImmutableList.Builder<Literal<?>> literValues = 
ImmutableList.builder();
+          for (int i = 0; i < values.length; i++) {
+            Type partitionColumnType = 
columnTypes.get(partitionKeys[i].trim());
+            literValues.add(Literals.of(values[i].replace("\"", ""), 
partitionColumnType));
+          }
+          lists.add(literValues.build().toArray(new Literal<?>[0]));
+        }
+        return Partitions.list(
+            partitionName, lists.build().toArray(new Literal<?>[0][0]), 
properties);
+      }
+      throw new UnsupportedOperationException(
+          String.format("%s is not a partitioned table", tableName));
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not a partitioned table", tableName));
+    }
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
new file mode 100644
index 0000000000..f1e86abfb9
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class TestStarRocksUtils {
+
+  @Test
+  public void testGeneratePropertiesSql() {
+    // Test when properties is null
+    Map<String, String> properties = null;
+    String result = StarRocksUtils.generatePropertiesSql(properties);
+    assertEquals("", result);
+
+    // Test when properties is empty
+    properties = Collections.emptyMap();
+    result = StarRocksUtils.generatePropertiesSql(properties);
+    assertEquals("", result);
+
+    // Test when properties has single entry
+    properties = Collections.singletonMap("key", "value");
+    result = StarRocksUtils.generatePropertiesSql(properties);
+    assertEquals(" PROPERTIES (\n\"key\"=\"value\"\n)", result);
+
+    // Test when properties has multiple entries
+    properties = new HashMap<>();
+    properties.put("key1", "value1");
+    properties.put("key2", "value2");
+
+    String expectedStr = " PROPERTIES 
(\n\"key1\"=\"value1\",\n\"key2\"=\"value2\"\n)";
+
+    result = StarRocksUtils.generatePropertiesSql(properties);
+    assertEquals(expectedStr, result);
+  }
+
+  @Test
+  public void testExtractTablePropertiesFromSql() {
+    // Test when properties is null
+    String createTableSql =
+        "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"";
+    Map<String, String> result = 
StarRocksUtils.extractPropertiesFromSql(createTableSql);
+    assertTrue(result.isEmpty());
+
+    // Test when properties exist
+    createTableSql =
+        "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES 
(\n\"test_property\"=\"test_value\"\n)";
+    result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+    assertEquals("test_value", result.get("test_property"));
+
+    // Test when multiple properties exist
+    createTableSql =
+        "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES 
(\n\"test_property1\"=\"test_value1\",\n\"test_property2\"=\"test_value2\"\n)";
+    result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+    assertEquals("test_value1", result.get("test_property1"));
+    assertEquals("test_value2", result.get("test_property2"));
+
+    // test when properties has blank
+    createTableSql =
+        "CREATE DATABASE `test`\nPROPERTIES (\n\"property1\" = 
\"value1\",\n\"comment\"= \"comment\"\n)";
+    result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+    assertEquals("value1", result.get("property1"));
+    assertEquals("comment", result.get("comment"));
+  }
+
+  @Test
+  public void testExtractPartitionInfoFromSql() {
+    // test range partition
+    String createTableSql =
+        "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n 
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+    Optional<Transform> transform = 
StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+    assertTrue(transform.isPresent());
+    assertEquals(Transforms.range(new String[] {"col1"}), transform.get());
+
+    // test list partition
+    createTableSql =
+        "CREATE TABLE `testTable` (\n`col1` int(11) NOT NULL\n) ENGINE=OLAP\n 
PARTITION BY LIST(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+    transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+    assertTrue(transform.isPresent());
+    assertEquals(Transforms.list(new String[][] {{"col1"}}), transform.get());
+
+    // test multi-column list partition
+    createTableSql =
+        "CREATE TABLE `testTable` (\n`col1` date NOT NULL,\n`col2` int(11) NOT 
NULL\n) ENGINE=OLAP\n PARTITION BY LIST(`col1`, `col2`)\n()\n DISTRIBUTED BY 
HASH(`col1`) BUCKETS 2";
+    transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+    assertTrue(transform.isPresent());
+    assertEquals(Transforms.list(new String[][] {{"col1"}, {"col2"}}), 
transform.get());
+
+    // test non-partitioned table
+    createTableSql =
+        "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"";
+    transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+    assertFalse(transform.isPresent());
+
+    createTableSql =
+        "CREATE TABLE `test_partitioned_table_4aec5cea` (\n"
+            + "  `starrocks_col_name1` int(11) NOT NULL COMMENT 
\"col_1_comment\",\n"
+            + "  `starrocks_col_name2` varchar(10) NULL COMMENT 
\"col_2_comment\",\n"
+            + "  `starrocks_col_name3` varchar(10) NULL COMMENT 
\"col_3_comment\",\n"
+            + "  `starrocks_col_name4` date NOT NULL COMMENT 
\"col_4_comment\"\n"
+            + ") ENGINE=OLAP \n"
+            + "DUPLICATE KEY(`starrocks_col_name1`, `starrocks_col_name2`)\n"
+            + "COMMENT \"table_comment_by_gravitino_it (From Gravitino, DO NOT 
EDIT: gravitino.v1.uid4058355477806830448)\"\n"
+            + "PARTITION BY LIST(`starrocks_col_name1`)(\n"
+            + "\n"
+            + ")\n"
+            + "DISTRIBUTED BY HASH(`starrocks_col_name1`) BUCKETS 2 \n"
+            + "PROPERTIES (\n"
+            + "\"compression\" = \"LZ4\",\n"
+            + "\"fast_schema_evolution\" = \"true\",\n"
+            + "\"replicated_storage\" = \"true\",\n"
+            + "\"replication_num\" = \"1\"\n"
+            + ");";
+    transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+    assertTrue(transform.isPresent());
+    assertEquals(Transforms.list(new String[][] {{"starrocks_col_name1"}}), 
transform.get());
+  }
+
+  @Test
+  public void testGeneratePartitionSqlFragment() {
+    // test range partition
+    Partition partition = Partitions.range("p1", Literals.NULL, Literals.NULL, 
null);
+    String partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p1` VALUES LESS THAN MAXVALUE", 
partitionSqlFragment);
+
+    partition =
+        Partitions.range(
+            "p2", Literals.of("2024-07-23", Types.DateType.get()), 
Literals.NULL, null);
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p2` VALUES LESS THAN (\"2024-07-23\")", 
partitionSqlFragment);
+
+    partition =
+        Partitions.range(
+            "p3", Literals.of("2024-07-24", Types.DateType.get()), 
Literals.NULL, null);
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p3` VALUES LESS THAN (\"2024-07-24\")", 
partitionSqlFragment);
+
+    partition =
+        Partitions.range(
+            "p4", Literals.NULL, Literals.of("2024-07-24", 
Types.DateType.get()), null);
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p4` VALUES LESS THAN MAXVALUE", 
partitionSqlFragment);
+
+    // test list partition
+    Literal[][] p5values = {{Literals.of("2024-07-24", Types.DateType.get())}};
+    partition = Partitions.list("p5", p5values, Collections.emptyMap());
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p5` VALUES IN (\"2024-07-24\")", 
partitionSqlFragment);
+
+    Literal[][] p6values = {{Literals.integerLiteral(1)}, 
{Literals.integerLiteral(2)}};
+    partition = Partitions.list("p6", p6values, Collections.emptyMap());
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p6` VALUES IN (\"1\",\"2\")", 
partitionSqlFragment);
+
+    Literal[][] p7values = {
+      {Literals.integerLiteral(1), Literals.integerLiteral(2)},
+      {Literals.integerLiteral(3), Literals.integerLiteral(4)}
+    };
+    partition = Partitions.list("p7", p7values, Collections.emptyMap());
+    partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(partition);
+    assertEquals("PARTITION `p7` VALUES IN ((\"1\",\"2\"),(\"3\",\"4\"))", 
partitionSqlFragment);
+  }
+
+  @Test
+  public void testDistributedInfoPattern() {
+    String createTableSql =
+        "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n 
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+    Distribution distribution = 
StarRocksUtils.extractDistributionInfoFromSql(createTableSql);
+    assertEquals(distribution.number(), 2);
+
+    String createTableSqlWithAuto =
+        "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n 
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`)";
+    Distribution distribution2 =
+        StarRocksUtils.extractDistributionInfoFromSql(createTableSqlWithAuto);
+    assertEquals(distribution2.number(), -1);
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties 
b/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..c2618539db
--- /dev/null
+++ b/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set to debug or trace if log4j initialization is failing
+status = info
+
+# Name of the configuration
+name = ConsoleLogConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = consoleLogger
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - 
%m%n
+
+# Log files location
+property.logPath = 
${sys:gravitino.log.path:-build/catalog-jdbc-starrocks-integration-test.log}
+
+# File appender configuration
+appender.file.type = File
+appender.file.name = fileLogger
+appender.file.fileName = ${logPath}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n
+
+# Root logger level
+rootLogger.level = info
+
+# Root logger referring to console and file appenders
+rootLogger.appenderRef.stdout.ref = consoleLogger
+rootLogger.appenderRef.file.ref = fileLogger
+
+# File appender configuration for testcontainers
+appender.testcontainersFile.type = File
+appender.testcontainersFile.name = testcontainersLogger
+appender.testcontainersFile.fileName = build/testcontainers.log
+appender.testcontainersFile.layout.type = PatternLayout
+appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%-5p %c - %m%n
+
+# Logger for testcontainers
+logger.testcontainers.name = org.testcontainers
+logger.testcontainers.level = debug
+logger.testcontainers.additivity = false
+logger.testcontainers.appenderRef.file.ref = testcontainersLogger
+
+logger.tc.name = tc
+logger.tc.level = debug
+logger.tc.additivity = false
+logger.tc.appenderRef.file.ref = testcontainersLogger
+
+logger.docker.name = com.github.dockerjava
+logger.docker.level = warn
+logger.docker.additivity = false
+logger.docker.appenderRef.file.ref = testcontainersLogger
+
+logger.http.name = 
com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire
+logger.http.level = off
\ No newline at end of file

Reply via email to