This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 71936c825d [#3302][Sub-Task] StarRocks catalog Table/DB ops (#7738)
71936c825d is described below
commit 71936c825df9c52aec230f9e0f58cb1a291807d0
Author: Jarvis <[email protected]>
AuthorDate: Mon Jul 28 19:30:47 2025 +0800
[#3302][Sub-Task] StarRocks catalog Table/DB ops (#7738)
### What changes were proposed in this pull request?
add StarRocks Catalog Implement
### Why are the changes needed?
To support StarRocks Catalog.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By E2E test, the test is in another pr
https://github.com/apache/gravitino/pull/7792
---
.../jdbc/operation/JdbcTableOperations.java | 6 +-
.../StarRocksColumnDefaultValueConverter.java | 69 ++-
.../converter/StarRocksExceptionConverter.java | 52 +-
.../converter/StarRocksTypeConverter.java | 124 ++++-
.../operations/StarRocksDatabaseOperations.java | 63 ++-
.../operations/StarRocksTableOperations.java | 523 ++++++++++++++++++++-
.../catalog/starrocks/utils/StarRocksUtils.java | 350 ++++++++++++++
.../starrocks/utils/TestStarRocksUtils.java | 209 ++++++++
.../src/test/resources/log4j2.properties | 73 +++
9 files changed, 1444 insertions(+), 25 deletions(-)
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index b421694f29..d911fa8e47 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -332,7 +332,7 @@ public abstract class JdbcTableOperations implements
TableOperation {
protected void purgeTable(String databaseName, String tableName) {
LOG.info("Attempting to purge table {} from database {}", tableName,
databaseName);
try (Connection connection = getConnection(databaseName)) {
- JdbcConnectorUtils.executeUpdate(connection,
generatePurgeTableSql(tableName));
+ JdbcConnectorUtils.executeUpdate(connection,
generatePurgeTableSql(databaseName, tableName));
LOG.info("Purge table {} from database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
@@ -498,6 +498,10 @@ public abstract class JdbcTableOperations implements
TableOperation {
protected abstract String generatePurgeTableSql(String tableName);
+ protected String generatePurgeTableSql(String databaseName, String
tableName) {
+ return generatePurgeTableSql(tableName);
+ }
+
protected abstract String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes);
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
index a1026e0e44..a9f8c5bd86 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
@@ -18,10 +18,29 @@
*/
package org.apache.gravitino.catalog.starrocks.converter;
-import org.apache.commons.lang3.NotImplementedException;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.BIGINT;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.CHAR;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DATE;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DATETIME;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DECIMAL;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.DOUBLE;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.FLOAT;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.INT;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.SMALLINT;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.TINYINT;
+import static
org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter.VARCHAR;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Types;
public class StarRocksColumnDefaultValueConverter extends
JdbcColumnDefaultValueConverter {
@@ -31,6 +50,52 @@ public class StarRocksColumnDefaultValueConverter extends
JdbcColumnDefaultValue
String columnDefaultValue,
boolean isExpression,
boolean nullable) {
- throw new NotImplementedException("To be implemented in the future");
+ if (columnDefaultValue == null) {
+ return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET;
+ }
+
+ if (columnDefaultValue.equalsIgnoreCase(NULL)) {
+ return Literals.NULL;
+ }
+
+ if (isExpression) {
+ if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) {
+ return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+ }
+ // The parsing of Doris expressions is complex, so we are not currently
undertaking the
+ // parsing.
+ return UnparsedExpression.of(columnDefaultValue);
+ }
+
+ switch (columnType.getTypeName().toLowerCase()) {
+ case TINYINT:
+ return Literals.byteLiteral(Byte.valueOf(columnDefaultValue));
+ case SMALLINT:
+ return Literals.shortLiteral(Short.valueOf(columnDefaultValue));
+ case INT:
+ return Literals.integerLiteral(Integer.valueOf(columnDefaultValue));
+ case BIGINT:
+ return Literals.longLiteral(Long.valueOf(columnDefaultValue));
+ case FLOAT:
+ return Literals.floatLiteral(Float.valueOf(columnDefaultValue));
+ case DOUBLE:
+ return Literals.doubleLiteral(Double.valueOf(columnDefaultValue));
+ case DECIMAL:
+ return Literals.decimalLiteral(
+ Decimal.of(columnDefaultValue, columnType.getColumnSize(),
columnType.getScale()));
+ case DATE:
+ return Literals.dateLiteral(LocalDate.parse(columnDefaultValue,
DATE_TIME_FORMATTER));
+ case DATETIME:
+ return CURRENT_TIMESTAMP.equals(columnDefaultValue)
+ ? DEFAULT_VALUE_OF_CURRENT_TIMESTAMP
+ : Literals.timestampLiteral(
+ LocalDateTime.parse(columnDefaultValue, DATE_TIME_FORMATTER));
+ case VARCHAR:
+ return Literals.of(columnDefaultValue,
Types.VarCharType.of(columnType.getColumnSize()));
+ case CHAR:
+ return Literals.of(columnDefaultValue,
Types.FixedCharType.of(columnType.getColumnSize()));
+ default:
+ throw new IllegalArgumentException("Unknown data columnType for
literal: " + columnType);
+ }
}
}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
index c5fe992532..889ba3bcf8 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksExceptionConverter.java
@@ -18,17 +18,65 @@
*/
package org.apache.gravitino.catalog.starrocks.converter;
+import com.google.common.annotations.VisibleForTesting;
import java.sql.SQLException;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.exceptions.UnauthorizedException;
/** Exception converter to Apache Gravitino exception for StarRocks. */
public class StarRocksExceptionConverter extends JdbcExceptionConverter {
+ // see https://docs.starrocks.io/docs/3.3/sql-reference/Error_code/
+ @VisibleForTesting static final int CODE_DATABASE_EXISTS = 1007;
+ static final int CODE_TABLE_EXISTS = 1050;
+ static final int CODE_DATABASE_NOT_EXISTS = 1008;
+ static final int CODE_UNKNOWN_DATABASE = 1049;
+ static final int CODE_UNKNOWN_DATABASE_2 = 5501;
+ static final int CODE_NO_SUCH_TABLE = 1051;
+ static final int CODE_NO_SUCH_TABLE_2 = 5502;
+ static final int CODE_UNAUTHORIZED = 1045;
+ static final int CODE_NO_SUCH_COLUMN = 1054;
+ static final int CODE_DELETE_NON_EXISTING_PARTITION = 1507;
+ static final int CODE_PARTITION_ALREADY_EXISTS = 1517;
+
@SuppressWarnings("FormatStringAnnotation")
@Override
public GravitinoRuntimeException toGravitinoException(SQLException se) {
- throw new GravitinoRuntimeException(
- String.format("StarRocks exception: %s", se.getMessage()), se);
+ int errorCode = se.getErrorCode();
+ switch (errorCode) {
+ case CODE_DATABASE_EXISTS:
+ return new SchemaAlreadyExistsException(se, se.getMessage());
+ case CODE_TABLE_EXISTS:
+ return new TableAlreadyExistsException(se, se.getMessage());
+ case CODE_DATABASE_NOT_EXISTS:
+ case CODE_UNKNOWN_DATABASE:
+ case CODE_UNKNOWN_DATABASE_2:
+ return new NoSuchSchemaException(se, se.getMessage());
+ case CODE_NO_SUCH_TABLE:
+ case CODE_NO_SUCH_TABLE_2:
+ return new NoSuchTableException(se, se.getMessage());
+ case CODE_UNAUTHORIZED:
+ return new UnauthorizedException(se, se.getMessage());
+ case CODE_NO_SUCH_COLUMN:
+ return new NoSuchColumnException(se, se.getMessage());
+ case CODE_DELETE_NON_EXISTING_PARTITION:
+ return new NoSuchPartitionException(se, se.getMessage());
+ case CODE_PARTITION_ALREADY_EXISTS:
+ return new PartitionAlreadyExistsException(se, se.getMessage());
+ default:
+ if (se.getMessage() != null && se.getMessage().contains("Access
denied")) {
+ return new ConnectionFailedException(se, se.getMessage());
+ }
+ return new GravitinoRuntimeException(se, se.getMessage());
+ }
}
}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
index 781394348c..1adebd7cf6 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksTypeConverter.java
@@ -18,19 +18,137 @@
*/
package org.apache.gravitino.catalog.starrocks.converter;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
/** Type converter for StarRocks. */
public class StarRocksTypeConverter extends JdbcTypeConverter {
+
+ static final String BIGINT = "bigint";
+ static final String BOOLEAN = "boolean";
+ static final String DECIMAL = "decimal";
+ static final String DOUBLE = "double";
+ static final String FLOAT = "float";
+ static final String INT = "int";
+ static final String LARGEINT = "largeint";
+ static final String SMALLINT = "smallint";
+ static final String TINYINT = "tinyint";
+
+ static final String BINARY = "binary";
+ static final String VARBINARY = "varbinary";
+ static final String CHAR = "char";
+ static final String STRING = "string";
+ static final String VARCHAR = "varchar";
+
+ static final String DATE = "date";
+ static final String DATETIME = "datetime";
+
+ static final String ARRAY = "array";
+ static final String JSON = "json";
+ static final String MAP = "map";
+ static final String STRUCT = "struct";
+
+ static final String BITMAP = "bitmap";
+ static final String HLL = "hll";
+
+ static final String BIT = "BIT";
+
@Override
public Type toGravitino(JdbcTypeBean typeBean) {
- throw new NotImplementedException("To be implemented in the future");
+ switch (typeBean.getTypeName().toLowerCase()) {
+ case BIGINT:
+ return Types.LongType.get();
+ case BOOLEAN:
+ return Types.BooleanType.get();
+ case DECIMAL:
+ return Types.DecimalType.of(typeBean.getColumnSize(),
typeBean.getScale());
+ case DOUBLE:
+ return Types.DoubleType.get();
+ case FLOAT:
+ return Types.FloatType.get();
+ case INT:
+ return Types.IntegerType.get();
+ case SMALLINT:
+ return Types.ShortType.get();
+ case TINYINT:
+ return Types.ByteType.get();
+ case BINARY:
+ case VARBINARY:
+ return Types.BinaryType.get();
+ case CHAR:
+ return Types.FixedCharType.of(typeBean.getColumnSize());
+ case STRING:
+ return Types.StringType.get();
+ case VARCHAR:
+ if (typeBean.getColumnSize() == 65533) {
+ return Types.StringType.get();
+ }
+ return Types.VarCharType.of(typeBean.getColumnSize());
+ case DATE:
+ return Types.DateType.get();
+ case DATETIME:
+ return Types.TimestampType.withoutTimeZone();
+ default:
+ if (typeBean.getTypeName().equals("BIT")
+ && typeBean.getColumnSize() == 1
+ && typeBean.getScale() == 0) {
+ return Types.BooleanType.get();
+ }
+ return Types.ExternalType.of(typeBean.getTypeName());
+ }
}
@Override
public String fromGravitino(Type type) {
- throw new NotImplementedException("To be implemented in the future");
+ if (type instanceof Types.LongType) {
+ return BIGINT;
+ } else if (type instanceof Types.BooleanType) {
+ return BOOLEAN;
+ } else if (type instanceof Types.DecimalType) {
+ return DECIMAL
+ + "("
+ + ((Types.DecimalType) type).precision()
+ + ","
+ + ((Types.DecimalType) type).scale()
+ + ")";
+ } else if (type instanceof Types.DoubleType) {
+ return DOUBLE;
+ } else if (type instanceof Types.FloatType) {
+ return FLOAT;
+ } else if (type instanceof Types.IntegerType) {
+ return INT;
+ } else if (type instanceof Types.ShortType) {
+ return SMALLINT;
+ } else if (type instanceof Types.ByteType) {
+ return TINYINT;
+ } else if (type instanceof Types.BinaryType) {
+ return BINARY;
+ } else if (type instanceof Types.FixedCharType) {
+ int length = ((Types.FixedCharType) type).length();
+ if (length < 1 || length > 255) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Type %s is invalid, length should be between 1 and 255",
type.simpleString()));
+ }
+
+ return CHAR + "(" + ((Types.FixedCharType) type).length() + ")";
+ } else if (type instanceof Types.StringType) {
+ return STRING;
+ } else if (type instanceof Types.VarCharType) {
+ int length = ((Types.VarCharType) type).length();
+ if (length < 1 || length > 1048576) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Type %s is invalid, length should be between 1 and 1048576",
type.simpleString()));
+ }
+ return VARCHAR + "(" + ((Types.VarCharType) type).length() + ")";
+ } else if (type instanceof Types.DateType) {
+ return DATE;
+ } else if (type instanceof Types.TimestampType) {
+ return DATETIME;
+ }
+ throw new IllegalArgumentException(
+ String.format("Couldn't convert Gravitino type %s to StarRocks type",
type.simpleString()));
}
}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
index 85607ed33c..4e7eda3b63 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksDatabaseOperations.java
@@ -19,34 +19,87 @@
package org.apache.gravitino.catalog.starrocks.operations;
import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.gravitino.catalog.jdbc.JdbcSchema;
import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Database operations for StarRocks. */
public class StarRocksDatabaseOperations extends JdbcDatabaseOperations {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StarRocksDatabaseOperations.class);
+
@Override
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
- throw new NotImplementedException("To be implemented in the future");
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(String.format("CREATE DATABASE `%s`", databaseName));
+
+ // Append properties
+ sqlBuilder.append("\n");
+ sqlBuilder.append(StarRocksUtils.generatePropertiesSql(properties));
+
+ String ddl = sqlBuilder.toString();
+ LOG.info("Generated create database:{} sql: {}", databaseName, ddl);
+ return ddl;
}
@Override
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
- throw new NotImplementedException("To be implemented in the future");
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(String.format("DROP DATABASE `%s`", databaseName));
+ if (cascade) {
+ sqlBuilder.append(" FORCE");
+ return sqlBuilder.toString();
+ }
+ String query = String.format("SHOW TABLES IN `%s`", databaseName);
+ try (final Connection connection = this.dataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(query)) {
+ if (resultSet.next()) {
+ throw new IllegalStateException(
+ String.format(
+ "Database %s is not empty, the value of cascade should be
true.", databaseName));
+ }
+ } catch (SQLException sqlException) {
+ throw this.exceptionMapper.toGravitinoException(sqlException);
+ }
+ return sqlBuilder.toString();
}
@Override
public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
- throw new NotImplementedException("To be implemented in the future");
+ List<String> allDatabases = listDatabases();
+ String dbName =
+ allDatabases.stream()
+ .filter(db -> db.equals(databaseName))
+ .findFirst()
+ .orElseThrow(
+ () -> new NoSuchSchemaException("Database %s could not be
found", databaseName));
+ // StarRocks support set properties, but can't get them after setting
+ //
https://docs.starrocks.io/docs/3.3/sql-reference/sql-statements/Database/SHOW_CREATE_DATABASE/
+ return JdbcSchema.builder()
+ .withName(dbName)
+ .withComment("")
+ .withProperties(Collections.emptyMap())
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
}
@Override
protected boolean supportSchemaComment() {
- return true;
+ return false;
}
@Override
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
index 11ae78b758..17825aefe7 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTableOperations.java
@@ -18,24 +18,70 @@
*/
package org.apache.gravitino.catalog.starrocks.operations;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.NotImplementedException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.catalog.jdbc.JdbcColumn;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.RangePartition;
/** Table operations for StarRocks. */
public class StarRocksTableOperations extends JdbcTableOperations {
+ private static final String BACK_QUOTE = "`";
+ private static final String AUTO_INCREMENT = "AUTO_INCREMENT";
+ private static final String NEW_LINE = "\n";
+
+ private static final Set<String> SUPPORTED_MODIFY_PROPERTIES =
+ new HashSet<>(
+ Arrays.asList(
+ "replication_num",
+ "default.replication_num",
+ "default.storage_medium",
+ "enable_persistent_index",
+ "bloom_filter_columns",
+ "colocate_with",
+ "bucket_size",
+ "base_compaction_forbidden_time_ranges"));
+ private static final String
SUPPORTED_MODIFY_PROPERTIES_PREFIX_DYNAMIC_PARTITION =
+ "dynamic_partition.";
+ private static final String SUPPORTED_MODIFY_PROPERTIES_PREFIX_BINLOG =
"binlog.";
+
@Override
public JdbcTablePartitionOperations
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
return new StarRocksTablePartitionOperations(
@@ -51,59 +97,512 @@ public class StarRocksTableOperations extends
JdbcTableOperations {
Transform[] partitioning,
Distribution distribution,
Index[] indexes) {
- throw new NotImplementedException("To be implemented in the future");
+
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append(String.format("CREATE TABLE `%s` ( \n", tableName));
+ // Add columns
+ sqlBuilder.append(
+ Arrays.stream(columns)
+ .map(
+ column -> {
+ StringBuilder columnsSql = new StringBuilder();
+ columnsSql
+ .append(SPACE)
+ .append(BACK_QUOTE)
+ .append(column.name())
+ .append(BACK_QUOTE);
+ appendColumnDefinition(column, columnsSql);
+ return columnsSql.toString();
+ })
+ .collect(Collectors.joining(",\n")));
+ sqlBuilder.append(")\n");
+ if (StringUtils.isNotEmpty(comment)) {
+ comment = StringIdentifier.addToComment(StringIdentifier.DUMMY_ID,
comment);
+ sqlBuilder.append(" COMMENT \"").append(comment).append("\"");
+ }
+
+ appendPartitionSql(partitioning, columns, sqlBuilder);
+ addDistributionSql(distribution, sqlBuilder);
+ addPropertiesSql(properties, sqlBuilder);
+
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ @Override
+ protected String generateAlterTableSql(
+ String databaseName, String tableName, TableChange... changes) {
+ JdbcTable lazyLoadTable = null;
+ List<String> alterSql = new ArrayList<>();
+ boolean hasSetPropertyChange = false;
+ for (int i = 0; i < changes.length; i++) {
+ TableChange change = changes[i];
+ if (change instanceof TableChange.AddColumn) {
+ TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(addColumnFieldDefinition(addColumn));
+ } else if (change instanceof TableChange.DeleteColumn) {
+ TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn)
change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(deleteColumnFieldDefinition(deleteColumn, lazyLoadTable));
+ } else if (change instanceof TableChange.RemoveProperty) {
+ throw new IllegalArgumentException("Remove property is not supported
yet.");
+ } else if (change instanceof TableChange.RenameColumn) {
+ TableChange.RenameColumn renameColumn = (TableChange.RenameColumn)
change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(renameColumnDefinition(renameColumn, lazyLoadTable));
+ } else if (change instanceof TableChange.RenameTable) {
+ TableChange.RenameTable renameTable = (TableChange.RenameTable) change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(renameTableDefinition(renameTable, lazyLoadTable));
+ } else if (change instanceof TableChange.UpdateColumnPosition) {
+ TableChange.UpdateColumnPosition updateColumnPosition =
+ (TableChange.UpdateColumnPosition) change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition,
lazyLoadTable));
+ } else if (change instanceof TableChange.UpdateColumnType) {
+ TableChange.UpdateColumnType updateColumnType =
(TableChange.UpdateColumnType) change;
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnTypeFieldDefinition(updateColumnType,
lazyLoadTable));
+ } else if (change instanceof TableChange.UpdateComment) {
+ TableChange.UpdateComment updateComment = (TableChange.UpdateComment)
change;
+ String newComment = updateComment.getNewComment();
+ alterSql.add("MODIFY COMMENT \"" + newComment + "\"");
+ } else if (change instanceof TableChange.SetProperty) {
+ if (hasSetPropertyChange) {
+ throw new IllegalArgumentException(
+ "StarRocks suggest modify one property at a time, please split
it to multiple request.");
+ }
+ TableChange.SetProperty setProperty = (TableChange.SetProperty) change;
+ if (!SUPPORTED_MODIFY_PROPERTIES.contains(setProperty.getProperty())
+ && setProperty
+ .getProperty()
+
.startsWith(SUPPORTED_MODIFY_PROPERTIES_PREFIX_DYNAMIC_PARTITION)
+ &&
setProperty.getProperty().startsWith(SUPPORTED_MODIFY_PROPERTIES_PREFIX_BINLOG))
{
+ throw new IllegalArgumentException(
+ "Current StarRocks not support modify this table property "
+ + setProperty.getProperty());
+ }
+ alterSql.add(generateTableProperties(setProperty));
+ hasSetPropertyChange = true;
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change type : " + change.getClass().getName());
+ }
+ }
+
+ String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n",
alterSql) + ";";
+ LOG.info("Generated alter table:{}.{} sql: {}", databaseName, tableName,
result);
+ return result;
}
@Override
protected boolean getAutoIncrementInfo(ResultSet resultSet) throws
SQLException {
- throw new NotImplementedException("To be implemented in the future");
+ return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
}
@Override
protected Map<String, String> getTableProperties(Connection connection,
String tableName)
throws SQLException {
- throw new NotImplementedException("To be implemented in the future");
+
+ String showCreateTableSQL = String.format("SHOW CREATE TABLE `%s`",
tableName);
+
+ StringBuilder createTableSqlSb = new StringBuilder();
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
+ while (resultSet.next()) {
+ createTableSqlSb.append(resultSet.getString("Create Table"));
+ }
+ }
+
+ String createTableSql = createTableSqlSb.toString();
+
+ if (StringUtils.isEmpty(createTableSql)) {
+ throw new NoSuchTableException(
+ "Table %s does not exist in %s.", tableName,
connection.getCatalog());
+ }
+
+ return
Collections.unmodifiableMap(StarRocksUtils.extractPropertiesFromSql(createTableSql));
}
@Override
protected List<Index> getIndexes(Connection connection, String databaseName,
String tableName)
throws SQLException {
- throw new NotImplementedException("To be implemented in the future");
+ String sql = String.format("SHOW INDEX FROM `%s` FROM `%s`", tableName,
databaseName);
+
+ // get Indexes from SQL
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+
+ List<Index> indexes = new ArrayList<>();
+ while (resultSet.next()) {
+ String indexName = resultSet.getString("Key_name");
+ String columnName = resultSet.getString("Column_name");
+ indexes.add(
+ Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][]
{{columnName}}));
+ }
+ return indexes;
+ } catch (SQLException e) {
+ throw exceptionMapper.toGravitinoException(e);
+ }
}
@Override
protected Transform[] getTablePartitioning(
Connection connection, String databaseName, String tableName) throws
SQLException {
- throw new NotImplementedException("To be implemented in the future");
+ String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`",
tableName);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showCreateTableSql)) {
+ StringBuilder createTableSql = new StringBuilder();
+ if (result.next()) {
+ createTableSql.append(result.getString("Create Table"));
+ }
+ Optional<Transform> transform =
+
StarRocksUtils.extractPartitionInfoFromSql(createTableSql.toString());
+ return transform.map(t -> new Transform[]
{t}).orElse(Transforms.EMPTY_TRANSFORM);
+ }
}
@Override
protected void correctJdbcTableFields(
Connection connection, String databaseName, String tableName,
JdbcTable.Builder tableBuilder)
throws SQLException {
- throw new NotImplementedException("To be implemented in the future");
+ String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`",
tableName);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showCreateTableSql)) {
+ StringBuilder createTableSql = new StringBuilder();
+ if (result.next()) {
+ createTableSql.append(result.getString("Create Table"));
+ }
+ String tableComment =
StarRocksUtils.extractTableCommentFromSql(createTableSql.toString());
+ tableBuilder.withComment(tableComment);
+ }
}
@Override
protected String generateRenameTableSql(String oldTableName, String
newTableName) {
- throw new NotImplementedException("To be implemented in the future");
+ return String.format("ALTER TABLE `%s` RENAME `%s`", oldTableName,
newTableName);
}
@Override
protected String generatePurgeTableSql(String tableName) {
+ // never called, as implemented generatePurgeTableSql(String databaseName,
String tableName)
throw new UnsupportedOperationException(
"StarRocks does not support purge table in Gravitino, please use drop
table");
}
@Override
- protected String generateAlterTableSql(
- String databaseName, String tableName, TableChange... changes) {
- throw new NotImplementedException("To be implemented in the future");
+ protected String generatePurgeTableSql(String databaseName, String
tableName) {
+ return String.format("TRUNCATE TABLE `%s`.`%s`", databaseName, tableName);
}
@Override
protected Distribution getDistributionInfo(
Connection connection, String databaseName, String tableName) throws
SQLException {
- throw new NotImplementedException("To be implemented in the future");
+
+ String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`",
tableName);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showCreateTableSql)) {
+ result.next();
+ String createTableSyntax = result.getString("Create Table");
+ return StarRocksUtils.extractDistributionInfoFromSql(createTableSyntax);
+ }
+ }
+
+ public StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder
sqlBuilder) {
+ // Add data type
+
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+ // Add NOT NULL if the column is marked as such
+ if (column.nullable()) {
+ sqlBuilder.append("NULL ");
+ } else {
+ sqlBuilder.append("NOT NULL ");
+ }
+
+ // Add DEFAULT value if specified
+ if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) {
+ sqlBuilder
+ .append("DEFAULT ")
+
.append(columnDefaultValueConverter.fromGravitino(column.defaultValue()))
+ .append(SPACE);
+ }
+
+ // Add column auto_increment if specified
+ if (column.autoIncrement()) {
+ sqlBuilder.append(AUTO_INCREMENT).append(" ");
+ }
+
+ // Add column comment if specified
+ if (StringUtils.isNotEmpty(column.comment())) {
+ sqlBuilder.append("COMMENT '").append(column.comment()).append("' ");
+ }
+ return sqlBuilder;
+ }
+
+ private static void appendPartitionSql(
+ Transform[] partitioning, JdbcColumn[] columns, StringBuilder
sqlBuilder) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+ Preconditions.checkArgument(
+ partitioning.length == 1, "Composite partition type is not supported");
+
+ StringBuilder partitionSqlBuilder;
+ Set<String> columnNames =
+
Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet());
+
+ if (partitioning[0] instanceof Transforms.RangeTransform) {
+ // We do not support multi-column range partitioning in StarRocks for now
+ Transforms.RangeTransform rangePartition = (Transforms.RangeTransform)
partitioning[0];
+ partitionSqlBuilder = generateRangePartitionSql(rangePartition,
columnNames);
+ } else if (partitioning[0] instanceof Transforms.ListTransform) {
+ Transforms.ListTransform listPartition = (Transforms.ListTransform)
partitioning[0];
+ partitionSqlBuilder = generateListPartitionSql(listPartition,
columnNames);
+ } else {
+ throw new IllegalArgumentException("Unsupported partition type of
StarRocks");
+ }
+
+ sqlBuilder.append(partitionSqlBuilder);
+ }
+
+ private static StringBuilder generateRangePartitionSql(
+ Transforms.RangeTransform rangePartition, Set<String> columnNames) {
+ Preconditions.checkArgument(
+ rangePartition.fieldName().length == 1,
+ "StarRocks partition does not support nested field");
+ Preconditions.checkArgument(
+ columnNames.contains(rangePartition.fieldName()[0]),
+ "The partition field must be one of the columns");
+
+ StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
+ String partitionDefinition =
+ String.format(" PARTITION BY RANGE(`%s`)",
rangePartition.fieldName()[0]);
+
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");
+
+ // Assign range partitions
+ RangePartition[] assignments = rangePartition.assignments();
+ if (!ArrayUtils.isEmpty(assignments)) {
+ String partitionSqlFragments =
+ Arrays.stream(assignments)
+ .map(StarRocksUtils::generatePartitionSqlFragment)
+ .collect(Collectors.joining("," + NEW_LINE));
+ partitionSqlBuilder.append(NEW_LINE).append(partitionSqlFragments);
+ }
+
+ partitionSqlBuilder.append(NEW_LINE).append(")");
+ return partitionSqlBuilder;
+ }
+
+ private static StringBuilder generateListPartitionSql(
+ Transforms.ListTransform listPartition, Set<String> columnNames) {
+ ImmutableList.Builder<String> partitionColumnsBuilder =
ImmutableList.builder();
+ String[][] filedNames = listPartition.fieldNames();
+ for (String[] filedName : filedNames) {
+ Preconditions.checkArgument(
+ filedName.length == 1, "StarRocks partition does not support nested
field");
+ Preconditions.checkArgument(
+ columnNames.contains(filedName[0]), "The partition field must be one
of the columns");
+
+ partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
+ }
+ String partitionColumns =
+
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));
+
+ StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
+ String partitionDefinition = String.format(" PARTITION BY LIST(%s)",
partitionColumns);
+
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");
+
+ // Assign list partitions
+ ListPartition[] assignments = listPartition.assignments();
+ if (!ArrayUtils.isEmpty(assignments)) {
+ ImmutableList.Builder<String> partitions = ImmutableList.builder();
+ for (ListPartition part : assignments) {
+ Literal<?>[][] lists = part.lists();
+ Preconditions.checkArgument(
+ lists.length > 0, "The number of values in list partition must be
greater than 0");
+ Preconditions.checkArgument(
+ Arrays.stream(lists).allMatch(p -> p.length == filedNames.length),
+ "The number of partitioning columns must be consistent");
+
+ partitions.add(StarRocksUtils.generatePartitionSqlFragment(part));
+ }
+ partitionSqlBuilder
+ .append(NEW_LINE)
+ .append(partitions.build().stream().collect(Collectors.joining("," +
NEW_LINE)));
+ }
+
+ partitionSqlBuilder.append(NEW_LINE).append(")");
+ return partitionSqlBuilder;
+ }
+
+ private static void addDistributionSql(Distribution distribution,
StringBuilder sqlBuilder) {
+ if (distribution == null || distribution.strategy() == Strategy.NONE) {
+ return;
+ }
+ if (distribution.strategy() == Strategy.HASH) {
+ sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH(");
+ sqlBuilder.append(
+ Arrays.stream(distribution.expressions())
+ .map(column -> BACK_QUOTE + column.toString() + BACK_QUOTE)
+ .collect(Collectors.joining(", ")));
+ sqlBuilder.append(")");
+ } else if (distribution.strategy() == Strategy.EVEN) {
+ sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY ").append("RANDOM");
+ }
+ if (distribution.number() != Distributions.AUTO) {
+ sqlBuilder
+ .append(" BUCKETS ")
+ .append(StarRocksUtils.toBucketNumberString(distribution.number()));
+ }
+ }
+
+ private static void addPropertiesSql(Map<String, String> properties,
StringBuilder sqlBuilder) {
+ if (properties == null || properties.isEmpty()) {
+ return;
+ }
+
sqlBuilder.append("\n").append(StarRocksUtils.generatePropertiesSql(properties));
+ }
+
+ private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+ String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+ if (addColumn.fieldName().length > 1) {
+ throw new UnsupportedOperationException("StarRocks does not support
nested column names.");
+ }
+ String col = addColumn.fieldName()[0];
+
+ StringBuilder columnDefinition = new StringBuilder();
+ columnDefinition
+ .append("ADD COLUMN ")
+ .append(BACK_QUOTE)
+ .append(col)
+ .append(BACK_QUOTE)
+ .append(SPACE)
+ .append(dataType)
+ .append(SPACE);
+
+ // Append comment if available
+ if (StringUtils.isNotEmpty(addColumn.getComment())) {
+ columnDefinition.append("COMMENT
'").append(addColumn.getComment()).append("' ");
+ }
+
+ // Append position if available
+ if (addColumn.getPosition() instanceof TableChange.First) {
+ columnDefinition.append("FIRST");
+ } else if (addColumn.getPosition() instanceof TableChange.After) {
+ TableChange.After afterPosition = (TableChange.After)
addColumn.getPosition();
+ columnDefinition
+ .append("AFTER ")
+ .append(BACK_QUOTE)
+ .append(afterPosition.getColumn())
+ .append(BACK_QUOTE);
+ } else if (addColumn.getPosition() instanceof TableChange.Default) {
+ // do nothing
+ } else {
+ throw new IllegalArgumentException("Invalid column position.");
+ }
+ return columnDefinition.toString();
+ }
+
+ private String deleteColumnFieldDefinition(
+ TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) {
+ if (deleteColumn.fieldName().length > 1) {
+ throw new UnsupportedOperationException("StarRocks does not support
nested column names.");
+ }
+ String col = deleteColumn.fieldName()[0];
+ try {
+ getJdbcColumnFromTable(jdbcTable, col);
+ } catch (NoSuchColumnException ex) {
+ if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+ return "";
+ } else {
+ throw new IllegalArgumentException("Delete column does not exist: " +
col);
+ }
+ }
+ return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE;
+ }
+
+ private String renameColumnDefinition(
+ TableChange.RenameColumn renameColumn, JdbcTable jdbcTable) {
+ if (renameColumn.fieldName().length > 1) {
+ throw new UnsupportedOperationException("StarRocks does not support
nested column names.");
+ }
+ String oldColName = renameColumn.fieldName()[0];
+ try {
+ getJdbcColumnFromTable(jdbcTable, oldColName);
+ } catch (NoSuchColumnException ex) {
+ throw new IllegalArgumentException("Original column does not exist: " +
oldColName);
+ }
+ try {
+ getJdbcColumnFromTable(jdbcTable, renameColumn.getNewName());
+ } catch (NoSuchColumnException ex) {
+ return String.format("RENAME COLUMN %s TO %s", oldColName,
renameColumn.getNewName());
+ }
+ throw new IllegalArgumentException("Column already exists: " +
renameColumn.getNewName());
+ }
+
+ private String renameTableDefinition(TableChange.RenameTable renameTable,
JdbcTable jdbcTable) {
+ try {
+ load(jdbcTable.databaseName(), renameTable.getNewName());
+ } catch (NoSuchTableException ex) {
+ return "RENAME " + renameTable.getNewName();
+ }
+ throw new IllegalArgumentException("Table already exists: " +
renameTable.getNewName());
+ }
+
+ private String generateTableProperties(TableChange.SetProperty setProperty) {
+ return String.format(
+ "set ( \"%s\" = \"%s\" )", setProperty.getProperty(),
setProperty.getValue());
+ }
+
+ private String updateColumnPositionFieldDefinition(
+ TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable
jdbcTable) {
+ if (updateColumnPosition.fieldName().length > 1) {
+ throw new UnsupportedOperationException("StarRocks does not support
nested column names.");
+ }
+ String col = updateColumnPosition.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+ StringBuilder columnDefinition = new StringBuilder();
+ columnDefinition.append("MODIFY COLUMN
").append(BACK_QUOTE).append(col).append(BACK_QUOTE);
+ appendColumnDefinition(column, columnDefinition);
+ if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+ columnDefinition.append("FIRST");
+ } else if (updateColumnPosition.getPosition() instanceof
TableChange.After) {
+ TableChange.After afterPosition = (TableChange.After)
updateColumnPosition.getPosition();
+ columnDefinition
+ .append("AFTER ")
+ .append(BACK_QUOTE)
+ .append(afterPosition.getColumn())
+ .append(BACK_QUOTE);
+ } else {
+ Arrays.stream(jdbcTable.columns())
+ .reduce((column1, column2) -> column2)
+ .map(Column::name)
+ .ifPresent(s -> columnDefinition.append("AFTER ").append(s));
+ }
+ return columnDefinition.toString();
+ }
+
+ private String updateColumnTypeFieldDefinition(
+ TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) {
+ if (updateColumnType.fieldName().length > 1) {
+ throw new UnsupportedOperationException("StarRocks does not support
nested column names.");
+ }
+ String col = updateColumnType.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+ StringBuilder sqlBuilder = new StringBuilder("MODIFY COLUMN " + BACK_QUOTE
+ col + BACK_QUOTE);
+ JdbcColumn newColumn =
+ JdbcColumn.builder()
+ .withName(col)
+ .withType(updateColumnType.getNewDataType())
+ .withComment(column.comment())
+ .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+ .withNullable(column.nullable())
+ .withAutoIncrement(column.autoIncrement())
+ .build();
+ return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
new file mode 100644
index 0000000000..75edc0971f
--- /dev/null
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/utils/StarRocksUtils.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.utils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StarRocksUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksUtils.class);
+
+ private static final Pattern PARTITION_INFO_PATTERN =
+ Pattern.compile("PARTITION BY \\b(LIST|RANGE)\\b\\((.+)\\)\\s*\\(?");
+
+ private static final Pattern DISTRIBUTION_INFO_PATTERN =
+ Pattern.compile(
+ "DISTRIBUTED
BY\\s+(HASH|RANDOM)\\s*(\\(([^)]+)\\))?\\s*(BUCKETS\\s+(\\d+))?");
+
+ private static final Pattern TABLE_COMMENT_PATTERN =
+ Pattern.compile("COMMENT\\s*\"([^\\(]+?)\\s*\\(From Gravitino,.*\\)\"");
+
+ private static final String PARTITION_TYPE_VALUE_PATTERN_STRING =
+ "types: \\[([^\\]]+)\\]; keys: \\[([^\\]]+)\\];";
+ private static final Pattern PARTITION_TYPE_VALUE_PATTERN =
+ Pattern.compile(PARTITION_TYPE_VALUE_PATTERN_STRING);
+
+ private static final Pattern PARTITION_LIST_PATTERN =
Pattern.compile("\\[([^\\[\\]]+)\\]");
+
+ private static final Pattern PARTITION_LIST_PATTERN2 =
Pattern.compile("\\(([^()]+)\\)");
+
+ private static final String LIST_PARTITION = "LIST";
+ private static final String RANGE_PARTITION = "RANGE";
+ public static final String ID = "PartitionId";
+ public static final String NAME = "PartitionName";
+ public static final String KEY = "PartitionKey";
+ public static final String VALUES_LIST = "List";
+ public static final String VALUES_RANGE = "Range";
+ public static final String VISIBLE_VERSION = "VisibleVersion";
+ public static final String VISIBLE_VERSION_TIME = "VisibleVersionTime";
+ public static final String STATE = "State";
+ public static final String DATA_SIZE = "DataSize";
+ public static final String IS_IN_MEMORY = "IsInMemory";
+
+ // convert Map<String, String> properties to SQL String
+ public static String generatePropertiesSql(Map<String, String> properties) {
+ if (properties == null || properties.isEmpty()) {
+ return "";
+ }
+ StringBuilder sqlBuilder = new StringBuilder(" PROPERTIES (\n");
+ sqlBuilder.append(
+ properties.entrySet().stream()
+ .map(entry -> "\"" + entry.getKey() + "\"=\"" + entry.getValue() +
"\"")
+ .collect(Collectors.joining(",\n")));
+ sqlBuilder.append("\n)");
+ return sqlBuilder.toString();
+ }
+
+ public static Map<String, String> extractPropertiesFromSql(String
createTableSql) {
+ Map<String, String> properties = new HashMap<>();
+ String[] lines = createTableSql.split("\n");
+
+ boolean isProperties = false;
+ final String sProperties = "\"(.*)\"\\s*=\\s*\"(.*)\",?";
+ final Pattern patternProperties = Pattern.compile(sProperties);
+
+ for (String line : lines) {
+ if (line.contains("PROPERTIES")) {
+ isProperties = true;
+ }
+
+ if (isProperties) {
+ final Matcher matcherProperties = patternProperties.matcher(line);
+ if (matcherProperties.find()) {
+ final String key = matcherProperties.group(1).trim();
+ String value = matcherProperties.group(2).trim();
+ properties.put(key, value);
+ }
+ }
+ }
+ return properties;
+ }
+
+ public static Optional<Transform> extractPartitionInfoFromSql(String
createTableSql) {
+ try {
+ String[] lines = createTableSql.split("\n");
+ for (String line : lines) {
+ Matcher matcher = PARTITION_INFO_PATTERN.matcher(line.trim());
+ if (matcher.matches()) {
+ String partitionType = matcher.group(1);
+ String partitionInfoString = matcher.group(2);
+ String[] columns =
+ Arrays.stream(partitionInfoString.split(","))
+ .map(String::trim)
+ .map(s -> s.replace("`", ""))
+ .toArray(String[]::new);
+ if (LIST_PARTITION.equals(partitionType)) {
+ String[][] filedNames =
+ Arrays.stream(columns).map(s -> new String[]
{s}).toArray(String[][]::new);
+ return Optional.of(Transforms.list(filedNames));
+ } else if (RANGE_PARTITION.equals(partitionType)) {
+ return Optional.of(Transforms.range(new String[] {columns[0]}));
+ }
+ }
+ }
+ return Optional.empty();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to extract partition info", e);
+ return Optional.empty();
+ }
+ }
+
+ public static Distribution extractDistributionInfoFromSql(String
createTableSql) {
+ Matcher matcher = DISTRIBUTION_INFO_PATTERN.matcher(createTableSql.trim());
+ if (matcher.find()) {
+ String distributionType = matcher.group(1);
+
+ // For Random distribution, no need to specify distribution columns.
+ String distributionColumns = matcher.group(3);
+ String[] columns =
+ Objects.equals(distributionColumns, null)
+ ? new String[] {}
+ : Arrays.stream(distributionColumns.split(","))
+ .map(String::trim)
+ .map(f -> f.substring(1, f.length() - 1))
+ .toArray(String[]::new);
+
+ int bucketNum = extractBucketNum(matcher);
+
+ return new Distributions.DistributionImpl.Builder()
+ .withStrategy(Strategy.getByName(distributionType))
+ .withNumber(bucketNum)
+ .withExpressions(
+ Arrays.stream(columns)
+ .map(col -> NamedReference.field(new String[] {col}))
+ .toArray(NamedReference[]::new))
+ .build();
+ }
+
+ throw new RuntimeException("Failed to extract distribution info in sql:" +
createTableSql);
+ }
+
+ public static String extractTableCommentFromSql(String createTableSql) {
+ Matcher matcher = TABLE_COMMENT_PATTERN.matcher(createTableSql.trim());
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ return "";
+ }
+
+ /**
+ * Generate sql fragment that create partition in StarRocks.
+ *
+ * <p>The sql fragment looks like "PARTITION {partitionName} VALUES
{values}", for example:
+ *
+ * <pre>PARTITION `p20240724` VALUES LESS THAN ("2024-07-24")</pre>
+ *
+ * <pre>PARTITION `p20240724_v1` VALUES IN ("2024-07-24", "v1")</pre>
+ *
+ * @param partition The partition to be created.
+ * @return The partition sql fragment.
+ */
+ public static String generatePartitionSqlFragment(Partition partition) {
+ String partitionSqlFragment = "PARTITION `%s` VALUES %s";
+ if (partition instanceof RangePartition) {
+ return String.format(
+ partitionSqlFragment,
+ partition.name(),
+ generateRangePartitionValues((RangePartition) partition));
+ } else if (partition instanceof ListPartition) {
+ return String.format(
+ partitionSqlFragment,
+ partition.name(),
+ generateListPartitionSqlValues((ListPartition) partition));
+ } else {
+ throw new IllegalArgumentException("Unsupported partition type of
StarRocks");
+ }
+ }
+
+ private static String generateRangePartitionValues(RangePartition
rangePartition) {
+ Literal<?> upper = rangePartition.upper();
+ String partitionValues;
+ if (Literals.NULL.equals(upper)) {
+ partitionValues = "LESS THAN MAXVALUE";
+ } else {
+ partitionValues = String.format("LESS THAN (\"%s\")", upper.value());
+ }
+ return partitionValues;
+ }
+
+ private static String generateListPartitionSqlValues(ListPartition
listPartition) {
+ Literal<?>[][] lists = listPartition.lists();
+ ImmutableList.Builder<String> listValues = ImmutableList.builder();
+ for (Literal<?>[] part : lists) {
+ String values;
+ if (part.length > 1) {
+ values =
+ String.format(
+ "(%s)",
+ Arrays.stream(part)
+ .map(p -> "\"" + p.value() + "\"")
+ .collect(Collectors.joining(",")));
+ } else {
+ values = String.format("\"%s\"", part[0].value());
+ }
+ listValues.add(values);
+ }
+ return String.format("IN (%s)",
listValues.build().stream().collect(Collectors.joining(",")));
+ }
+
+ private static int extractBucketNum(Matcher matcher) {
+ int bucketNum = Distributions.AUTO;
+ if (matcher.find(5)) {
+ String bucketValue = matcher.group(5);
+ if (bucketValue == null) {
+ return bucketNum;
+ }
+ // Use -1 to indicate auto bucket.
+ bucketNum = Integer.parseInt(bucketValue);
+ }
+ return bucketNum;
+ }
+
+ public static String toBucketNumberString(int number) {
+ return String.valueOf(number);
+ }
+
+ public static Partition fromStarRocksPartition(
+ String tableName, ResultSet resultSet, Transform partitionInfo,
Map<String, Type> columnTypes)
+ throws SQLException {
+ String partitionName = resultSet.getString(NAME);
+ String partitionKey = resultSet.getString(KEY);
+ String partitionValues;
+ if (partitionInfo instanceof Transforms.RangeTransform) {
+ partitionValues = resultSet.getString(VALUES_RANGE);
+ } else if (partitionInfo instanceof Transforms.ListTransform) {
+ partitionValues = resultSet.getString(VALUES_LIST);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not a partitioned table", tableName));
+ }
+ ImmutableMap.Builder<String, String> propertiesBuilder =
ImmutableMap.builder();
+ propertiesBuilder.put(ID, resultSet.getString(ID));
+ propertiesBuilder.put(VISIBLE_VERSION,
resultSet.getString(VISIBLE_VERSION));
+ propertiesBuilder.put(VISIBLE_VERSION_TIME,
resultSet.getString(VISIBLE_VERSION_TIME));
+ propertiesBuilder.put(STATE, resultSet.getString(STATE));
+ propertiesBuilder.put(KEY, partitionKey);
+ propertiesBuilder.put(DATA_SIZE, resultSet.getString(DATA_SIZE));
+ propertiesBuilder.put(IS_IN_MEMORY, resultSet.getString(IS_IN_MEMORY));
+ ImmutableMap<String, String> properties = propertiesBuilder.build();
+
+ String[] partitionKeys = partitionKey.split(",");
+ if (partitionInfo instanceof Transforms.RangeTransform) {
+ if (partitionKeys.length != 1) {
+ throw new UnsupportedOperationException(
+ "Multi-column range partitioning in StarRocks is not supported
yet");
+ }
+ Type partitionColumnType = columnTypes.get(partitionKeys[0].trim());
+ Literal<?> lower = Literals.NULL;
+ Literal<?> upper = Literals.NULL;
+ Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+ if (matcher.find()) {
+ String lowerValue = matcher.group(2);
+ lower = Literals.of(lowerValue, partitionColumnType);
+ if (matcher.find()) {
+ String upperValue = matcher.group(2);
+ upper = Literals.of(upperValue, partitionColumnType);
+ }
+ }
+ return Partitions.range(partitionName, upper, lower, properties);
+ } else if (partitionInfo instanceof Transforms.ListTransform) {
+ ImmutableList.Builder<Literal<?>[]> lists = ImmutableList.builder();
+ partitionValues = partitionValues.trim();
+ if (partitionValues.startsWith("(") && partitionValues.endsWith(")")) {
+ Matcher matcher = PARTITION_LIST_PATTERN2.matcher(partitionValues);
+ while (matcher.find()) {
+ String[] values = matcher.group(1).split(",");
+ ImmutableList.Builder<Literal<?>> literValues =
ImmutableList.builder();
+ for (int i = 0; i < values.length; i++) {
+ Type partitionColumnType =
columnTypes.get(partitionKeys[i].trim());
+ literValues.add(
+ Literals.of(
+ values[i].trim().replace("\"", "").replace("'", ""),
partitionColumnType));
+ }
+ lists.add(literValues.build().toArray(new Literal<?>[0]));
+ }
+ return Partitions.list(
+ partitionName, lists.build().toArray(new Literal<?>[0][0]),
properties);
+ } else if (partitionValues.startsWith("[") &&
partitionValues.endsWith("]")) {
+ Matcher matcher = PARTITION_LIST_PATTERN.matcher(partitionValues);
+ while (matcher.find()) {
+ String[] values = matcher.group(1).split(",");
+ ImmutableList.Builder<Literal<?>> literValues =
ImmutableList.builder();
+ for (int i = 0; i < values.length; i++) {
+ Type partitionColumnType =
columnTypes.get(partitionKeys[i].trim());
+ literValues.add(Literals.of(values[i].replace("\"", ""),
partitionColumnType));
+ }
+ lists.add(literValues.build().toArray(new Literal<?>[0]));
+ }
+ return Partitions.list(
+ partitionName, lists.build().toArray(new Literal<?>[0][0]),
properties);
+ }
+ throw new UnsupportedOperationException(
+ String.format("%s is not a partitioned table", tableName));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not a partitioned table", tableName));
+ }
+ }
+}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
new file mode 100644
index 0000000000..f1e86abfb9
--- /dev/null
+++
b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/utils/TestStarRocksUtils.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.starrocks.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+public class TestStarRocksUtils {
+
+ @Test
+ public void testGeneratePropertiesSql() {
+ // Test when properties is null
+ Map<String, String> properties = null;
+ String result = StarRocksUtils.generatePropertiesSql(properties);
+ assertEquals("", result);
+
+ // Test when properties is empty
+ properties = Collections.emptyMap();
+ result = StarRocksUtils.generatePropertiesSql(properties);
+ assertEquals("", result);
+
+ // Test when properties has single entry
+ properties = Collections.singletonMap("key", "value");
+ result = StarRocksUtils.generatePropertiesSql(properties);
+ assertEquals(" PROPERTIES (\n\"key\"=\"value\"\n)", result);
+
+ // Test when properties has multiple entries
+ properties = new HashMap<>();
+ properties.put("key1", "value1");
+ properties.put("key2", "value2");
+
+ String expectedStr = " PROPERTIES
(\n\"key1\"=\"value1\",\n\"key2\"=\"value2\"\n)";
+
+ result = StarRocksUtils.generatePropertiesSql(properties);
+ assertEquals(expectedStr, result);
+ }
+
+ @Test
+ public void testExtractTablePropertiesFromSql() {
+ // Test when properties is null
+ String createTableSql =
+ "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"";
+ Map<String, String> result =
StarRocksUtils.extractPropertiesFromSql(createTableSql);
+ assertTrue(result.isEmpty());
+
+ // Test when properties exist
+ createTableSql =
+ "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES
(\n\"test_property\"=\"test_value\"\n)";
+ result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+ assertEquals("test_value", result.get("test_property"));
+
+ // Test when multiple properties exist
+ createTableSql =
+ "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES
(\n\"test_property1\"=\"test_value1\",\n\"test_property2\"=\"test_value2\"\n)";
+ result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+ assertEquals("test_value1", result.get("test_property1"));
+ assertEquals("test_value2", result.get("test_property2"));
+
+ // test when properties has blank
+ createTableSql =
+ "CREATE DATABASE `test`\nPROPERTIES (\n\"property1\" =
\"value1\",\n\"comment\"= \"comment\"\n)";
+ result = StarRocksUtils.extractPropertiesFromSql(createTableSql);
+ assertEquals("value1", result.get("property1"));
+ assertEquals("comment", result.get("comment"));
+ }
+
+ @Test
+ public void testExtractPartitionInfoFromSql() {
+ // test range partition
+ String createTableSql =
+ "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+ Optional<Transform> transform =
StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+ assertTrue(transform.isPresent());
+ assertEquals(Transforms.range(new String[] {"col1"}), transform.get());
+
+ // test list partition
+ createTableSql =
+ "CREATE TABLE `testTable` (\n`col1` int(11) NOT NULL\n) ENGINE=OLAP\n
PARTITION BY LIST(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+ transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+ assertTrue(transform.isPresent());
+ assertEquals(Transforms.list(new String[][] {{"col1"}}), transform.get());
+
+ // test multi-column list partition
+ createTableSql =
+ "CREATE TABLE `testTable` (\n`col1` date NOT NULL,\n`col2` int(11) NOT
NULL\n) ENGINE=OLAP\n PARTITION BY LIST(`col1`, `col2`)\n()\n DISTRIBUTED BY
HASH(`col1`) BUCKETS 2";
+ transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+ assertTrue(transform.isPresent());
+ assertEquals(Transforms.list(new String[][] {{"col1"}, {"col2"}}),
transform.get());
+
+ // test non-partitioned table
+ createTableSql =
+ "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT
'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"";
+ transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+ assertFalse(transform.isPresent());
+
+ createTableSql =
+ "CREATE TABLE `test_partitioned_table_4aec5cea` (\n"
+ + " `starrocks_col_name1` int(11) NOT NULL COMMENT
\"col_1_comment\",\n"
+ + " `starrocks_col_name2` varchar(10) NULL COMMENT
\"col_2_comment\",\n"
+ + " `starrocks_col_name3` varchar(10) NULL COMMENT
\"col_3_comment\",\n"
+ + " `starrocks_col_name4` date NOT NULL COMMENT
\"col_4_comment\"\n"
+ + ") ENGINE=OLAP \n"
+ + "DUPLICATE KEY(`starrocks_col_name1`, `starrocks_col_name2`)\n"
+ + "COMMENT \"table_comment_by_gravitino_it (From Gravitino, DO NOT
EDIT: gravitino.v1.uid4058355477806830448)\"\n"
+ + "PARTITION BY LIST(`starrocks_col_name1`)(\n"
+ + "\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(`starrocks_col_name1`) BUCKETS 2 \n"
+ + "PROPERTIES (\n"
+ + "\"compression\" = \"LZ4\",\n"
+ + "\"fast_schema_evolution\" = \"true\",\n"
+ + "\"replicated_storage\" = \"true\",\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ");";
+ transform = StarRocksUtils.extractPartitionInfoFromSql(createTableSql);
+ assertTrue(transform.isPresent());
+ assertEquals(Transforms.list(new String[][] {{"starrocks_col_name1"}}),
transform.get());
+ }
+
+ @Test
+ public void testGeneratePartitionSqlFragment() {
+ // test range partition
+ Partition partition = Partitions.range("p1", Literals.NULL, Literals.NULL,
null);
+ String partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p1` VALUES LESS THAN MAXVALUE",
partitionSqlFragment);
+
+ partition =
+ Partitions.range(
+ "p2", Literals.of("2024-07-23", Types.DateType.get()),
Literals.NULL, null);
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p2` VALUES LESS THAN (\"2024-07-23\")",
partitionSqlFragment);
+
+ partition =
+ Partitions.range(
+ "p3", Literals.of("2024-07-24", Types.DateType.get()),
Literals.NULL, null);
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p3` VALUES LESS THAN (\"2024-07-24\")",
partitionSqlFragment);
+
+ partition =
+ Partitions.range(
+ "p4", Literals.NULL, Literals.of("2024-07-24",
Types.DateType.get()), null);
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p4` VALUES LESS THAN MAXVALUE",
partitionSqlFragment);
+
+ // test list partition
+ Literal[][] p5values = {{Literals.of("2024-07-24", Types.DateType.get())}};
+ partition = Partitions.list("p5", p5values, Collections.emptyMap());
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p5` VALUES IN (\"2024-07-24\")",
partitionSqlFragment);
+
+ Literal[][] p6values = {{Literals.integerLiteral(1)},
{Literals.integerLiteral(2)}};
+ partition = Partitions.list("p6", p6values, Collections.emptyMap());
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p6` VALUES IN (\"1\",\"2\")",
partitionSqlFragment);
+
+ Literal[][] p7values = {
+ {Literals.integerLiteral(1), Literals.integerLiteral(2)},
+ {Literals.integerLiteral(3), Literals.integerLiteral(4)}
+ };
+ partition = Partitions.list("p7", p7values, Collections.emptyMap());
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(partition);
+ assertEquals("PARTITION `p7` VALUES IN ((\"1\",\"2\"),(\"3\",\"4\"))",
partitionSqlFragment);
+ }
+
+ @Test
+ public void testDistributedInfoPattern() {
+ String createTableSql =
+ "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`) BUCKETS 2";
+ Distribution distribution =
StarRocksUtils.extractDistributionInfoFromSql(createTableSql);
+ assertEquals(distribution.number(), 2);
+
+ String createTableSqlWithAuto =
+ "CREATE TABLE `testTable` (\n`col1` date NOT NULL\n) ENGINE=OLAP\n
PARTITION BY RANGE(`col1`)\n()\n DISTRIBUTED BY HASH(`col1`)";
+ Distribution distribution2 =
+ StarRocksUtils.extractDistributionInfoFromSql(createTableSqlWithAuto);
+ assertEquals(distribution2.number(), -1);
+ }
+}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties
b/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..c2618539db
--- /dev/null
+++ b/catalogs/catalog-jdbc-starrocks/src/test/resources/log4j2.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set to debug or trace if log4j initialization is failing
+status = info
+
+# Name of the configuration
+name = ConsoleLogConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = consoleLogger
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L -
%m%n
+
+# Log files location
+property.logPath =
${sys:gravitino.log.path:-build/catalog-jdbc-starrocks-integration-test.log}
+
+# File appender configuration
+appender.file.type = File
+appender.file.name = fileLogger
+appender.file.fileName = ${logPath}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n
+
+# Root logger level
+rootLogger.level = info
+
+# Root logger referring to console and file appenders
+rootLogger.appenderRef.stdout.ref = consoleLogger
+rootLogger.appenderRef.file.ref = fileLogger
+
+# File appender configuration for testcontainers
+appender.testcontainersFile.type = File
+appender.testcontainersFile.name = testcontainersLogger
+appender.testcontainersFile.fileName = build/testcontainers.log
+appender.testcontainersFile.layout.type = PatternLayout
+appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
%-5p %c - %m%n
+
+# Logger for testcontainers
+logger.testcontainers.name = org.testcontainers
+logger.testcontainers.level = debug
+logger.testcontainers.additivity = false
+logger.testcontainers.appenderRef.file.ref = testcontainersLogger
+
+logger.tc.name = tc
+logger.tc.level = debug
+logger.tc.additivity = false
+logger.tc.appenderRef.file.ref = testcontainersLogger
+
+logger.docker.name = com.github.dockerjava
+logger.docker.level = warn
+logger.docker.additivity = false
+logger.docker.appenderRef.file.ref = testcontainersLogger
+
+logger.http.name =
com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire
+logger.http.level = off
\ No newline at end of file