This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new aefb2740fb [#3302][Sub-Task] StarRocks catalog E2E test (#7792) aefb2740fb is described below commit aefb2740fb3a6a2d30d39e119b88374b35f9e534 Author: Jarvis <jar...@apache.org> AuthorDate: Thu Jul 31 16:02:06 2025 +0800 [#3302][Sub-Task] StarRocks catalog E2E test (#7792) ### What changes were proposed in this pull request? add StarRocks Catalog Implement ### Why are the changes needed? To support StarRocks Catalog. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By E2E test --- .../gravitino/catalog/jdbc/config/JdbcConfig.java | 3 +- .../integration/test/CatalogStarRocksIT.java | 928 +++++++++++++++++++++ .../catalog/starrocks/operation/TestStarRocks.java | 85 ++ .../operation/TestStarRocksDatabaseOperations.java | 52 ++ .../operation/TestStarRocksTableOperations.java | 583 +++++++++++++ .../TestStarRocksTablePartitionOperations.java | 361 ++++++++ .../src/main/java/org/apache/gravitino/Config.java | 2 +- .../apache/gravitino/config/ConfigConstants.java | 5 +- docs/index.md | 1 + docs/jdbc-starrocks-catalog.md | 197 +++++ .../integration/test/container/ContainerSuite.java | 27 +- .../test/container/StarRocksContainer.java | 135 +++ 12 files changed, 2375 insertions(+), 4 deletions(-) diff --git a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java index dc29343af9..28c6aa9fb4 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java +++ b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/config/JdbcConfig.java @@ -20,6 +20,7 @@ package org.apache.gravitino.catalog.jdbc.config; import java.util.Map; +import java.util.Objects; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Config; import org.apache.gravitino.config.ConfigBuilder; @@ -65,7 +66,7 @@ public class JdbcConfig extends Config { .doc("The password of the Jdbc connection") .version(ConfigConstants.VERSION_0_3_0) .stringConf() - .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .checkValue(Objects::nonNull, ConfigConstants.NOT_NULL_ERROR_MSG) .create(); public static final ConfigEntry<Integer> POOL_MIN_SIZE = diff --git a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java new file mode 100644 index 0000000000..5437f363fe --- /dev/null +++ b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/integration/test/CatalogStarRocksIT.java @@ -0,0 +1,928 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.starrocks.integration.test; + +import static org.apache.gravitino.integration.test.util.ITUtils.assertPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Schema; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.ConnectionFailedException; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.StarRocksContainer; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.integration.test.util.ITUtils; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.SupportsPartitions; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.partitions.ListPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.apache.gravitino.rel.partitions.RangePartition; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.utils.RandomNameUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@Tag("gravitino-docker-test") +public class CatalogStarRocksIT extends BaseIT { + + private static final String provider = "jdbc-starrocks"; + + private static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + + public String metalakeName = GravitinoITUtils.genRandomName("starrocks_it_metalake"); + public String catalogName = GravitinoITUtils.genRandomName("starrocks_it_catalog"); + public String schemaName = GravitinoITUtils.genRandomName("starrocks_it_schema"); + public String tableName = GravitinoITUtils.genRandomName("starrocks_it_table"); + + public String table_comment = "table_comment_by_gravitino_it"; + + // StarRocks doesn't support schema comment + public String schema_comment = null; + public String STARROCKS_COL_NAME1 = "starrocks_col_name1"; + public String STARROCKS_COL_NAME2 = "starrocks_col_name2"; + public String STARROCKS_COL_NAME3 = "starrocks_col_name3"; + public String STARROCKS_COL_NAME4 = "starrocks_col_name4"; + + // Because the creation of Schema Change is an asynchronous process, we need to wait for a while + // For more information, you can refer to the comment in + // StarRocksTableOperations.generateAlterTableSql(). + private static final long MAX_WAIT_IN_SECONDS = 30; + + private static final long WAIT_INTERVAL_IN_SECONDS = 1; + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + private GravitinoMetalake metalake; + private String jdbcUrl; + + protected Catalog catalog; + + @BeforeAll + public void startup() throws IOException { + containerSuite.startStarRocksContainer(); + + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public void stop() { + clearTableAndSchema(); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + } + + @AfterEach + public void resetSchema() { + clearTableAndSchema(); + createSchema(); + } + + private void clearTableAndSchema() { + catalog.asSchemas().dropSchema(schemaName, true); + } + + private void createMetalake() { + GravitinoMetalake[] gravitinoMetaLakes = client.listMetalakes(); + assertEquals(0, gravitinoMetaLakes.length); + + client.createMetalake(metalakeName, "comment", Collections.emptyMap()); + GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName); + assertEquals(metalakeName, loadMetalake.name()); + + metalake = loadMetalake; + } + + private void createCatalog() { + Map<String, String> catalogProperties = Maps.newHashMap(); + + StarRocksContainer starRocksContainer = containerSuite.getStarRocksContainer(); + + jdbcUrl = + String.format( + "jdbc:mysql://%s:%d/", + starRocksContainer.getContainerIpAddress(), StarRocksContainer.FE_MYSQL_PORT); + + catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl); + catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), StarRocksContainer.USER_NAME); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), ""); + + Catalog createdCatalog = + metalake.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + provider, + "starrocks catalog comment", + catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(catalogName); + assertEquals(createdCatalog, loadCatalog); + + catalog = loadCatalog; + } + + private void createSchema() { + NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, schemaName); + Map<String, String> prop = Maps.newHashMap(); + + Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), schema_comment, prop); + Schema loadSchema = catalog.asSchemas().loadSchema(ident.name()); + assertEquals(createdSchema.name(), loadSchema.name()); + } + + private Column[] createColumns() { + Column col1 = + Column.of( + STARROCKS_COL_NAME1, Types.IntegerType.get(), "col_1_comment", false, false, null); + Column col2 = Column.of(STARROCKS_COL_NAME2, Types.VarCharType.of(10), "col_2_comment"); + Column col3 = Column.of(STARROCKS_COL_NAME3, Types.VarCharType.of(10), "col_3_comment"); + Column col4 = + Column.of(STARROCKS_COL_NAME4, Types.DateType.get(), "col_4_comment", false, false, null); + return new Column[] {col1, col2, col3, col4}; + } + + private Map<String, String> createTableProperties() { + return ImmutableMap.of(); + } + + private Distribution createDistribution() { + return Distributions.hash(2, NamedReference.field(STARROCKS_COL_NAME1)); + } + + @Test + void testStarRocksSchemaBasicOperation() { + SupportsSchemas schemas = catalog.asSchemas(); + + // test list schemas + String[] schemaNames = schemas.listSchemas(); + assertTrue(Arrays.asList(schemaNames).contains(schemaName)); + + // test create schema already exists + String testSchemaName = GravitinoITUtils.genRandomName("create_schema_test"); + NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); + schemas.createSchema(schemaIdent.name(), schema_comment, Collections.emptyMap()); + + List<String> schemaNameList = Arrays.asList(schemas.listSchemas()); + assertTrue(schemaNameList.contains(testSchemaName)); + + assertThrows( + SchemaAlreadyExistsException.class, + () -> schemas.createSchema(schemaIdent.name(), schema_comment, Collections.emptyMap())); + + // test drop schema + assertTrue(schemas.dropSchema(schemaIdent.name(), false)); + + // check schema is deleted + // 1. check by load schema + assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent.name())); + + // 2. check by list schema + schemaNameList = Arrays.asList(schemas.listSchemas()); + assertFalse(schemaNameList.contains(testSchemaName)); + + // test drop schema not exists + NameIdentifier notExistsSchemaIdent = NameIdentifier.of(metalakeName, catalogName, "no-exits"); + assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(), false)); + } + + @Test + void testDropStarRocksSchema() { + String schemaName = GravitinoITUtils.genRandomName("starrocks_it_schema_dropped").toLowerCase(); + + catalog.asSchemas().createSchema(schemaName, "", ImmutableMap.of()); + + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(schemaName, tableName), + createColumns(), + "Created by Gravitino client", + createTableProperties(), + Transforms.EMPTY_TRANSFORM, + createDistribution(), + null); + + // Try to drop a database, and cascade equals to false, it should not be allowed. + Throwable excep = + assertThrows( + RuntimeException.class, () -> catalog.asSchemas().dropSchema(schemaName, false)); + assertTrue(excep.getMessage().contains("the value of cascade should be true.")); + + // Check the database still exists + catalog.asSchemas().loadSchema(schemaName); + + // Try to drop a database, and cascade equals to true, it should be allowed. + assertTrue(catalog.asSchemas().dropSchema(schemaName, true)); + + // Check database has been dropped + SupportsSchemas schemas = catalog.asSchemas(); + assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaName)); + } + + @Test + void testSchemaWithIllegalName() { + SupportsSchemas schemas = catalog.asSchemas(); + String databaseName = RandomNameUtils.genRandomName("it_db"); + Map<String, String> properties = new HashMap<>(); + String comment = "comment"; + + // should throw an exception with string that might contain SQL injection + String sqlInjection = databaseName + "`; DROP TABLE important_table; -- "; + assertThrows( + IllegalArgumentException.class, + () -> schemas.createSchema(sqlInjection, comment, properties)); + assertThrows(IllegalArgumentException.class, () -> schemas.dropSchema(sqlInjection, false)); + + String sqlInjection1 = databaseName + "`; SLEEP(10); -- "; + assertThrows( + IllegalArgumentException.class, + () -> schemas.createSchema(sqlInjection1, comment, properties)); + assertThrows(IllegalArgumentException.class, () -> schemas.dropSchema(sqlInjection1, false)); + + String sqlInjection2 = + databaseName + "`; UPDATE Users SET password = 'newpassword' WHERE username = 'admin'; -- "; + assertThrows( + IllegalArgumentException.class, + () -> schemas.createSchema(sqlInjection2, comment, properties)); + assertThrows(IllegalArgumentException.class, () -> schemas.dropSchema(sqlInjection2, false)); + + // should throw an exception with input that has more than 64 characters + String invalidInput = StringUtils.repeat("a", 65); + assertThrows( + IllegalArgumentException.class, + () -> schemas.createSchema(invalidInput, comment, properties)); + assertThrows(IllegalArgumentException.class, () -> schemas.dropSchema(invalidInput, false)); + } + + @Test + void testStarRocksTableBasicOperation() { + // create a table + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + + Distribution distribution = createDistribution(); + + Map<String, String> properties = createTableProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + null, + null); + + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + Transforms.EMPTY_TRANSFORM, + loadTable); + + // rename table + String newTableName = GravitinoITUtils.genRandomName("new_table_name"); + tableCatalog.alterTable(tableIdentifier, TableChange.rename(newTableName)); + NameIdentifier newTableIdentifier = NameIdentifier.of(schemaName, newTableName); + Table renamedTable = tableCatalog.loadTable(newTableIdentifier); + ITUtils.assertionsTableInfo( + newTableName, + table_comment, + Arrays.asList(columns), + properties, + null, + Transforms.EMPTY_TRANSFORM, + renamedTable); + } + + @Test + void testStarRocksIllegalTableName() { + Map<String, String> properties = createTableProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + String table_name = "t123"; + + String t1_name = table_name + "`; DROP TABLE important_table; -- "; + Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, false, null); + Column[] columns = {t1_col}; + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, t1_name); + + assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + null)); + assertThrows( + IllegalArgumentException.class, () -> catalog.asTableCatalog().dropTable(tableIdentifier)); + + String t2_name = table_name + "`; SLEEP(10); -- "; + Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, false, null); + Column[] columns2 = new Column[] {t2_col}; + NameIdentifier tableIdentifier2 = + NameIdentifier.of(metalakeName, catalogName, schemaName, t2_name); + + assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier2, + columns2, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + null)); + assertThrows( + IllegalArgumentException.class, () -> catalog.asTableCatalog().dropTable(tableIdentifier2)); + + String t3_name = + table_name + "`; UPDATE Users SET password = 'newpassword' WHERE username = 'admin'; -- "; + Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, false, null); + Column[] columns3 = new Column[] {t3_col}; + NameIdentifier tableIdentifier3 = + NameIdentifier.of(metalakeName, catalogName, schemaName, t3_name); + + assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier3, + columns3, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + null)); + assertThrows( + IllegalArgumentException.class, () -> catalog.asTableCatalog().dropTable(tableIdentifier3)); + + String invalidInput = StringUtils.repeat("a", 65); + Column t4_col = Column.of(invalidInput, Types.LongType.get(), "id", false, false, null); + Column[] columns4 = new Column[] {t4_col}; + NameIdentifier tableIdentifier4 = + NameIdentifier.of(metalakeName, catalogName, schemaName, invalidInput); + + assertThrows( + IllegalArgumentException.class, + () -> + tableCatalog.createTable( + tableIdentifier4, + columns4, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + new SortOrder[0], + null)); + assertThrows( + IllegalArgumentException.class, () -> catalog.asTableCatalog().dropTable(tableIdentifier4)); + } + + @Test + void testTestConnection() { + Map<String, String> catalogProperties = Maps.newHashMap(); + catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl); + catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), StarRocksContainer.USER_NAME); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), "wrong_password"); + + Exception exception = + assertThrows( + ConnectionFailedException.class, + () -> + metalake.testConnection( + GravitinoITUtils.genRandomName("starrocks_it_catalog"), + Catalog.Type.RELATIONAL, + provider, + "starrocks catalog comment", + catalogProperties)); + Assertions.assertTrue(exception.getMessage().contains("Access denied for user")); + } + + @Test + void testAlterStarRocksTable() { + // create a table + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + + Distribution distribution = createDistribution(); + + Map<String, String> properties = createTableProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + null, + null); + Table loadedTable = tableCatalog.loadTable(tableIdentifier); + + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + Transforms.EMPTY_TRANSFORM, + loadedTable); + + // Alter column type + tableCatalog.alterTable( + tableIdentifier, + TableChange.updateColumnType( + new String[] {STARROCKS_COL_NAME3}, Types.VarCharType.of(255))); + + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> + ITUtils.assertColumn( + Column.of(STARROCKS_COL_NAME3, Types.VarCharType.of(255), "col_3_comment"), + tableCatalog.loadTable(tableIdentifier).columns()[2])); + + // add new column + tableCatalog.alterTable( + tableIdentifier, + TableChange.addColumn( + new String[] {"col_5"}, Types.VarCharType.of(255), "col_5_comment", true)); + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> assertEquals(5, tableCatalog.loadTable(tableIdentifier).columns().length)); + + ITUtils.assertColumn( + Column.of("col_5", Types.VarCharType.of(255), "col_5_comment"), + tableCatalog.loadTable(tableIdentifier).columns()[4]); + + // change column position + // TODO: change column position is unstable, add it later + + // drop column + tableCatalog.alterTable( + tableIdentifier, TableChange.deleteColumn(new String[] {"col_5"}, true)); + + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> assertEquals(4, tableCatalog.loadTable(tableIdentifier).columns().length)); + } + + @Test + void testStarRocksTablePartitionOperation() { + // create a partitioned table + String tableName = GravitinoITUtils.genRandomName("test_partitioned_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + Distribution distribution = createDistribution(); + Map<String, String> properties = createTableProperties(); + Transform[] partitioning = {Transforms.list(new String[][] {{STARROCKS_COL_NAME1}})}; + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + partitioning, + loadTable); + + // get table partition operations + SupportsPartitions tablePartitionOperations = loadTable.supportPartitions(); + + // assert partition info when there is no partitions actually + String[] emptyPartitionNames = tablePartitionOperations.listPartitionNames(); + assertEquals(0, emptyPartitionNames.length); + Partition[] emptyPartitions = tablePartitionOperations.listPartitions(); + assertEquals(0, emptyPartitions.length); + + // get non-existing partition + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p1")); + + // add partition with incorrect type + Partition incorrectType = + Partitions.range("p1", Literals.NULL, Literals.NULL, Collections.emptyMap()); + assertThrows( + IllegalArgumentException.class, () -> tablePartitionOperations.addPartition(incorrectType)); + + // add partition with incorrect value + Partition incorrectValue = + Partitions.list( + "p1", new Literal[][] {{Literals.NULL, Literals.NULL}}, Collections.emptyMap()); + assertThrows( + IllegalArgumentException.class, + () -> tablePartitionOperations.addPartition(incorrectValue)); + + // add partition + Literal[][] p1Values = {{Literals.integerLiteral(1)}}; + Literal[][] p2Values = {{Literals.integerLiteral(2)}}; + Literal[][] p3Values = {{Literals.integerLiteral(3)}}; + + ListPartition p1 = Partitions.list("p1", p1Values, Collections.emptyMap()); + ListPartition p2 = Partitions.list("p2", p2Values, Collections.emptyMap()); + ListPartition p3 = Partitions.list("p3", p3Values, Collections.emptyMap()); + ListPartition p1Added = (ListPartition) tablePartitionOperations.addPartition(p1); + assertPartition(p1, p1Added); + ListPartition p2Added = (ListPartition) tablePartitionOperations.addPartition(p2); + assertPartition(p2, p2Added); + ListPartition p3Added = (ListPartition) tablePartitionOperations.addPartition(p3); + assertPartition(p3, p3Added); + + // check partitions + Set<String> partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(3, partitionNames.size()); + assertTrue(partitionNames.contains("p1")); + assertTrue(partitionNames.contains("p2")); + assertTrue(partitionNames.contains("p3")); + + Map<String, ListPartition> partitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p)); + assertEquals(3, partitions.size()); + assertPartition(p1, partitions.get("p1")); + assertPartition(p2, partitions.get("p2")); + assertPartition(p3, partitions.get("p3")); + + assertPartition(p1, tablePartitionOperations.getPartition("p1")); + assertPartition(p2, tablePartitionOperations.getPartition("p2")); + assertPartition(p3, tablePartitionOperations.getPartition("p3")); + + // drop partition + assertTrue(tablePartitionOperations.dropPartition("p3")); + partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(2, partitionNames.size()); + assertFalse(partitionNames.contains("p3")); + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p3")); + + // drop non-existing partition + assertFalse(tablePartitionOperations.dropPartition("p3")); + } + + @Test + void testCreatePartitionedTable() { + // create a range-partitioned table with assignments + String tableName = GravitinoITUtils.genRandomName("test_create_range_partitioned_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + Distribution distribution = createDistribution(); + + Map<String, String> properties = createTableProperties(); + Literal todayLiteral = Literals.of("2024-07-24", Types.DateType.get()); + Literal tomorrowLiteral = Literals.of("2024-07-25", Types.DateType.get()); + RangePartition p1 = Partitions.range("p1", todayLiteral, Literals.NULL, Collections.emptyMap()); + RangePartition p2 = + Partitions.range("p2", tomorrowLiteral, todayLiteral, Collections.emptyMap()); + RangePartition p3 = + Partitions.range("p3", Literals.NULL, Literals.NULL, Collections.emptyMap()); + Transform[] partitioning = { + Transforms.range(new String[] {STARROCKS_COL_NAME4}, new RangePartition[] {p1, p2, p3}) + }; + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + Table loadedTable = tableCatalog.loadTable(tableIdentifier); + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + new Transform[] {Transforms.range(new String[] {STARROCKS_COL_NAME4})}, + loadedTable); + + // assert partition info + SupportsPartitions tablePartitionOperations = loadedTable.supportPartitions(); + Map<String, RangePartition> loadedRangePartitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(Partition::name, p -> (RangePartition) p)); + assertTrue(loadedRangePartitions.size() == 3); + assertTrue(loadedRangePartitions.containsKey("p1")); + assertPartition( + Partitions.range( + "p1", + todayLiteral, + Literals.of("0000-01-01", Types.DateType.get()), + Collections.emptyMap()), + loadedRangePartitions.get("p1")); + assertTrue(loadedRangePartitions.containsKey("p2")); + assertPartition(p2, loadedRangePartitions.get("p2")); + assertTrue(loadedRangePartitions.containsKey("p3")); + assertPartition( + Partitions.range( + "p3", + Literals.of("MAXVALUE", Types.DateType.get()), + tomorrowLiteral, + Collections.emptyMap()), + loadedRangePartitions.get("p3")); + + // create a list-partitioned table with assignments + tableName = GravitinoITUtils.genRandomName("test_create_list_partitioned_table"); + tableIdentifier = NameIdentifier.of(schemaName, tableName); + Literal<Integer> integerLiteral1 = Literals.integerLiteral(1); + Literal<Integer> integerLiteral2 = Literals.integerLiteral(2); + ListPartition p4 = + Partitions.list( + "p4", + new Literal[][] {{integerLiteral1, todayLiteral}, {integerLiteral1, tomorrowLiteral}}, + Collections.emptyMap()); + ListPartition p5 = + Partitions.list( + "p5", + new Literal[][] {{integerLiteral2, todayLiteral}, {integerLiteral2, tomorrowLiteral}}, + Collections.emptyMap()); + partitioning = + new Transform[] { + Transforms.list( + new String[][] {{STARROCKS_COL_NAME1}, {STARROCKS_COL_NAME4}}, + new ListPartition[] {p4, p5}) + }; + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + loadedTable = tableCatalog.loadTable(tableIdentifier); + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + new Transform[] { + Transforms.list(new String[][] {{STARROCKS_COL_NAME1}, {STARROCKS_COL_NAME4}}) + }, + loadedTable); + + // assert partition info + tablePartitionOperations = loadedTable.supportPartitions(); + Map<String, ListPartition> loadedListPartitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(Partition::name, p -> (ListPartition) p)); + assertTrue(loadedListPartitions.size() == 2); + assertTrue(loadedListPartitions.containsKey("p4")); + assertPartition(p4, loadedListPartitions.get("p4")); + assertTrue(loadedListPartitions.containsKey("p5")); + assertPartition(p5, loadedListPartitions.get("p5")); + } + + @Test + void testTableWithTimeStampColumn() { + // create a table + String tableName = GravitinoITUtils.genRandomName("test_table_with_timestamp_column"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + columns = + ArrayUtils.add(columns, Column.of("timestamp_col", Types.TimestampType.withoutTimeZone())); + Distribution distribution = createDistribution(); + + Map<String, String> properties = createTableProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + null, + null); + + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Column timestampColumn = + Arrays.stream(loadTable.columns()) + .filter(c -> "timestamp_col".equals(c.name())) + .findFirst() + .get(); + + Assertions.assertEquals(Types.TimestampType.withoutTimeZone(), timestampColumn.dataType()); + } + + @Test + void testNonPartitionedTable() { + // create a non-partitioned table + String tableName = GravitinoITUtils.genRandomName("test_non_partitioned_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + Distribution distribution = createDistribution(); + Map<String, String> properties = createTableProperties(); + Transform[] partitioning = Transforms.EMPTY_TRANSFORM; + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + ITUtils.assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + null, + partitioning, + loadTable); + + // get table partition operations + SupportsPartitions tablePartitionOperations = loadTable.supportPartitions(); + + assertThrows( + UnsupportedOperationException.class, () -> tablePartitionOperations.listPartitionNames()); + + assertThrows( + UnsupportedOperationException.class, () -> tablePartitionOperations.listPartitions()); + + assertThrows( + UnsupportedOperationException.class, + () -> + tablePartitionOperations.addPartition( + Partitions.range("p1", Literals.NULL, Literals.NULL, Collections.emptyMap()))); + + assertThrows( + UnsupportedOperationException.class, () -> tablePartitionOperations.getPartition("p1")); + + assertThrows( + UnsupportedOperationException.class, () -> tablePartitionOperations.dropPartition("p1")); + } + + @Test + void testAllDistribution() { + Distribution[] distributions = + new Distribution[] { + Distributions.even(1, Expression.EMPTY_EXPRESSION), + Distributions.hash(1, NamedReference.field(STARROCKS_COL_NAME1)), + Distributions.even(10, Expression.EMPTY_EXPRESSION), + Distributions.hash(0, NamedReference.field(STARROCKS_COL_NAME1)), + Distributions.hash(11, NamedReference.field(STARROCKS_COL_NAME1)), + Distributions.hash( + 12, + NamedReference.field(STARROCKS_COL_NAME1), + NamedReference.field(STARROCKS_COL_NAME2)) + }; + + for (Distribution distribution : distributions) { + String tableName = GravitinoITUtils.genRandomName("test_distribution_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + Map<String, String> properties = createTableProperties(); + Transform[] partitioning = Transforms.EMPTY_TRANSFORM; + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + + Assertions.assertEquals(distribution.strategy(), loadTable.distribution().strategy()); + Assertions.assertArrayEquals( + distribution.expressions(), loadTable.distribution().expressions()); + + tableCatalog.dropTable(tableIdentifier); + } + } + + @Test + void testAllDistributionWithAuto() { + Distribution distribution = + Distributions.auto(Strategy.HASH, NamedReference.field(STARROCKS_COL_NAME1)); + + String tableName = GravitinoITUtils.genRandomName("test_distribution_table"); + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Column[] columns = createColumns(); + Map<String, String> properties = createTableProperties(); + Transform[] partitioning = Transforms.EMPTY_TRANSFORM; + TableCatalog tableCatalog = catalog.asTableCatalog(); + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + null, + null); + // load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + + Assertions.assertEquals(distribution.strategy(), loadTable.distribution().strategy()); + Assertions.assertEquals(distribution.number(), loadTable.distribution().number()); + Assertions.assertArrayEquals( + distribution.expressions(), loadTable.distribution().expressions()); + tableCatalog.dropTable(tableIdentifier); + } +} diff --git a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java new file mode 100644 index 0000000000..7f5abf97bc --- /dev/null +++ b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocks.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.starrocks.operation; + +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; +import org.apache.gravitino.catalog.jdbc.TestJdbc; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.utils.DataSourceUtils; +import org.apache.gravitino.catalog.starrocks.converter.StarRocksColumnDefaultValueConverter; +import org.apache.gravitino.catalog.starrocks.converter.StarRocksExceptionConverter; +import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter; +import org.apache.gravitino.catalog.starrocks.operations.StarRocksDatabaseOperations; +import org.apache.gravitino.catalog.starrocks.operations.StarRocksTableOperations; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.StarRocksContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class TestStarRocks extends TestJdbc { + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + + @BeforeAll + public static void startup() { + containerSuite.startStarRocksContainer(); + + DATA_SOURCE = DataSourceUtils.createDataSource(getStarRocksCatalogProperties()); + + DATABASE_OPERATIONS = new StarRocksDatabaseOperations(); + TABLE_OPERATIONS = new StarRocksTableOperations(); + JDBC_EXCEPTION_CONVERTER = new StarRocksExceptionConverter(); + DATABASE_OPERATIONS.initialize(DATA_SOURCE, JDBC_EXCEPTION_CONVERTER, Collections.emptyMap()); + TABLE_OPERATIONS.initialize( + DATA_SOURCE, + JDBC_EXCEPTION_CONVERTER, + new StarRocksTypeConverter(), + new StarRocksColumnDefaultValueConverter(), + Collections.emptyMap()); + } + + // Overwrite the stop method to close the data source and stop the container + @AfterAll + public static void stop() { + DataSourceUtils.closeDataSource(DATA_SOURCE); + if (null != CONTAINER) { + CONTAINER.stop(); + } + } + + private static Map<String, String> getStarRocksCatalogProperties() { + Map<String, String> catalogProperties = Maps.newHashMap(); + + StarRocksContainer starRocksContainer = containerSuite.getStarRocksContainer(); + + String jdbcUrl = + String.format( + "jdbc:mysql://%s:%d/", + starRocksContainer.getContainerIpAddress(), StarRocksContainer.FE_MYSQL_PORT); + + catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl); + catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), StarRocksContainer.USER_NAME); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), StarRocksContainer.PASSWORD); + + return catalogProperties; + } +} diff --git a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java new file mode 100644 index 0000000000..288816d86d --- /dev/null +++ b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksDatabaseOperations.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.starrocks.operation; + +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.utils.RandomNameUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("gravitino-docker-test") +public class TestStarRocksDatabaseOperations extends TestStarRocks { + + @Test + public void testBaseOperationDatabase() { + String databaseName = RandomNameUtils.genRandomName("it_db"); + String comment = ""; + // StarRocks can't get properties after set, so the test case can't include properties + testBaseOperation(databaseName, Collections.emptyMap(), comment); + + // recreate database, get exception. + Assertions.assertThrowsExactly( + SchemaAlreadyExistsException.class, + () -> DATABASE_OPERATIONS.create(databaseName, "", Collections.emptyMap())); + + testDropDatabase(databaseName); + } + + @Test + void testListSystemDatabase() { + List<String> databaseNames = DATABASE_OPERATIONS.listDatabases(); + Assertions.assertFalse(databaseNames.contains("information_schema")); + } +} diff --git a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java new file mode 100644 index 0000000000..fc8b804aec --- /dev/null +++ b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTableOperations.java @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.starrocks.operation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Maps; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations; +import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter; +import org.apache.gravitino.catalog.starrocks.operations.StarRocksTablePartitionOperations; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.partitions.ListPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.apache.gravitino.rel.partitions.RangePartition; +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.utils.RandomNameUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@Tag("gravitino-docker-test") +public class TestStarRocksTableOperations extends TestStarRocks { + private static final JdbcTypeConverter TYPE_CONVERTER = new StarRocksTypeConverter(); + private static final Type VARCHAR_255 = Types.VarCharType.of(255); + private static final Type VARCHAR_1024 = Types.VarCharType.of(1024); + + private static final Type INT = Types.IntegerType.get(); + + private static final Integer DEFAULT_BUCKET_SIZE = 1; + + private static final String databaseName = GravitinoITUtils.genRandomName("starrocks_test_db"); + + // Because the creation of Schema Change is an asynchronous process, we need to wait for a while + // For more information, you can refer to the comment in + // StarRocksTableOperations.generateAlterTableSql(). + private static final long MAX_WAIT_IN_SECONDS = 30; + + private static final long WAIT_INTERVAL_IN_SECONDS = 1; + + @BeforeAll + public static void startup() { + TestStarRocks.startup(); + createDatabase(); + } + + private static void createDatabase() { + DATABASE_OPERATIONS.create(databaseName, "", new HashMap<>()); + } + + private static Map<String, String> createProperties() { + Map<String, String> properties = Maps.newHashMap(); + return properties; + } + + @Test + void testAllDistribution() { + Distribution[] distributions = + new Distribution[] { + Distributions.even(DEFAULT_BUCKET_SIZE, Expression.EMPTY_EXPRESSION), + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")), + Distributions.even(10, Expression.EMPTY_EXPRESSION), + Distributions.hash(0, NamedReference.field("col_1")), + Distributions.hash(11, NamedReference.field("col_1")), + Distributions.hash(12, NamedReference.field("col_1"), NamedReference.field("col_2")) + }; + + for (Distribution distribution : distributions) { + String tableName = GravitinoITUtils.genRandomName("starrocks_basic_test_table"); + String tableComment = "gravitino_table_test_comment"; + List<JdbcColumn> columns = new ArrayList<>(); + JdbcColumn col_1 = + JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build(); + columns.add(col_2); + JdbcColumn col_3 = + JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build(); + columns.add(col_3); + Map<String, String> properties = new HashMap<>(); + Index[] indexes = new Index[] {}; + + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo( + tableName, tableComment, columns, properties, indexes, Transforms.EMPTY_TRANSFORM, load); + + Assertions.assertEquals(distribution.strategy(), load.distribution().strategy()); + Assertions.assertArrayEquals(distribution.expressions(), load.distribution().expressions()); + TABLE_OPERATIONS.drop(databaseName, tableName); + } + } + + @Test + public void testBasicTableOperation() { + String tableName = GravitinoITUtils.genRandomName("starrocks_basic_test_table"); + String tableComment = "test_comment"; + List<JdbcColumn> columns = new ArrayList<>(); + JdbcColumn col_1 = + JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build(); + columns.add(col_2); + JdbcColumn col_3 = + JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build(); + columns.add(col_3); + Map<String, String> properties = new HashMap<>(); + + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + List<String> listTables = TABLE_OPERATIONS.listTables(databaseName); + assertTrue(listTables.contains(tableName)); + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo( + tableName, tableComment, columns, properties, indexes, Transforms.EMPTY_TRANSFORM, load); + + // rename table + String newName = GravitinoITUtils.genRandomName("new_table"); + Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.rename(databaseName, tableName, newName)); + Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.load(databaseName, newName)); + + Assertions.assertTrue(TABLE_OPERATIONS.drop(databaseName, newName), "table should be dropped"); + + listTables = TABLE_OPERATIONS.listTables(databaseName); + Assertions.assertFalse(listTables.contains(newName)); + + Assertions.assertFalse( + TABLE_OPERATIONS.drop(databaseName, newName), "table should be non-existent"); + } + + @Test + public void testAlterTable() { + String tableName = GravitinoITUtils.genRandomName("starrocks_alter_test_table"); + + String tableComment = "test_comment"; + List<JdbcColumn> columns = new ArrayList<>(); + JdbcColumn col_1 = + JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build(); + columns.add(col_2); + JdbcColumn col_3 = + JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build(); + columns.add(col_3); + Map<String, String> properties = new HashMap<>(); + + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo( + tableName, tableComment, columns, properties, indexes, Transforms.EMPTY_TRANSFORM, load); + + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.updateColumnType(new String[] {col_3.name()}, VARCHAR_1024)); + + // After modifying the type, check it + columns.clear(); + col_3 = + JdbcColumn.builder() + .withName(col_3.name()) + .withType(VARCHAR_1024) + .withComment(col_3.comment()) + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertionsTableInfo( + tableName, + tableComment, + columns, + properties, + indexes, + Transforms.EMPTY_TRANSFORM, + TABLE_OPERATIONS.load(databaseName, tableName))); + + // add new column + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.addColumn(new String[] {"col_4"}, VARCHAR_255, "txt4", true)); + + columns.clear(); + JdbcColumn col_4 = + JdbcColumn.builder().withName("col_4").withType(VARCHAR_255).withComment("txt4").build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + columns.add(col_4); + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertionsTableInfo( + tableName, + tableComment, + columns, + properties, + indexes, + Transforms.EMPTY_TRANSFORM, + TABLE_OPERATIONS.load(databaseName, tableName))); + + // change column position + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.updateColumnPosition( + new String[] {"col_3"}, TableChange.ColumnPosition.after("col_4"))); + + columns.clear(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_4); + columns.add(col_3); + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertionsTableInfo( + tableName, + tableComment, + columns, + properties, + indexes, + Transforms.EMPTY_TRANSFORM, + TABLE_OPERATIONS.load(databaseName, tableName))); + + // drop column if exist + TABLE_OPERATIONS.alterTable( + databaseName, tableName, TableChange.deleteColumn(new String[] {"col_4"}, true)); + columns.clear(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + Awaitility.await() + .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS) + .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertionsTableInfo( + tableName, + tableComment, + columns, + properties, + indexes, + Transforms.EMPTY_TRANSFORM, + TABLE_OPERATIONS.load(databaseName, tableName))); + + // delete column that does not exist + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.deleteColumn(new String[] {"col_4"}, false))); + + Assertions.assertEquals( + "Delete column does not exist: col_4", illegalArgumentException.getMessage()); + Assertions.assertDoesNotThrow( + () -> + TABLE_OPERATIONS.alterTable( + databaseName, tableName, TableChange.deleteColumn(new String[] {"col_4"}, true))); + } + + @Test + public void testCreateAllTypeTable() { + String tableName = GravitinoITUtils.genRandomName("all_type_table"); + String tableComment = "test_comment"; + List<JdbcColumn> columns = new ArrayList<>(); + columns.add(JdbcColumn.builder().withName("col_1").withType(Types.IntegerType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_3").withType(Types.ByteType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_4").withType(Types.ShortType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_5").withType(Types.IntegerType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_6").withType(Types.LongType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_7").withType(Types.FloatType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_8").withType(Types.DoubleType.get()).build()); + columns.add( + JdbcColumn.builder().withName("col_9").withType(Types.DecimalType.of(21, 2)).build()); + columns.add(JdbcColumn.builder().withName("col_10").withType(Types.DateType.get()).build()); + columns.add( + JdbcColumn.builder().withName("col_11").withType(Types.FixedCharType.of(10)).build()); + columns.add(JdbcColumn.builder().withName("col_12").withType(Types.VarCharType.of(10)).build()); + columns.add(JdbcColumn.builder().withName("col_13").withType(Types.StringType.get()).build()); + columns.add( + JdbcColumn.builder() + .withName("col_14") + .withType(Types.TimestampType.withoutTimeZone()) + .build()); + + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo( + tableName, + tableComment, + columns, + Collections.emptyMap(), + null, + Transforms.EMPTY_TRANSFORM, + load); + } + + @Test + public void testCreateNotSupportTypeTable() { + String tableName = RandomNameUtils.genRandomName("unsupported_type_table"); + String tableComment = "test_comment"; + List<JdbcColumn> columns = new ArrayList<>(); + List<Type> notSupportType = + Arrays.asList( + Types.FixedType.of(10), + Types.IntervalDayType.get(), + Types.IntervalYearType.get(), + Types.UUIDType.get(), + Types.ListType.of(Types.DateType.get(), true), + Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), true), + Types.UnionType.of(Types.IntegerType.get()), + Types.StructType.of( + Types.StructType.Field.notNullField("col_1", Types.IntegerType.get()))); + + for (Type type : notSupportType) { + columns.clear(); + columns.add(JdbcColumn.builder().withName("col_1").withType(Types.IntegerType.get()).build()); + columns.add( + JdbcColumn.builder().withName("col_2").withType(type).withNullable(false).build()); + + JdbcColumn[] jdbcCols = columns.toArray(new JdbcColumn[0]); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> { + TABLE_OPERATIONS.create( + databaseName, + tableName, + jdbcCols, + tableComment, + createProperties(), + null, + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")), + Indexes.EMPTY_INDEXES); + }); + Assertions.assertTrue( + illegalArgumentException + .getMessage() + .contains( + String.format( + "Couldn't convert Gravitino type %s to StarRocks type", + type.simpleString()))); + } + } + + @Test + public void testCreatePartitionedTable() { + String tableComment = "partition_table_comment"; + JdbcColumn col1 = + JdbcColumn.builder() + .withName("col_1") + .withType(Types.IntegerType.get()) + .withNullable(false) + .build(); + JdbcColumn col2 = + JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build(); + JdbcColumn col3 = + JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build(); + JdbcColumn col4 = + JdbcColumn.builder() + .withName("col_4") + .withType(Types.DateType.get()) + .withNullable(false) + .build(); + List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4); + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table with range partition + String rangePartitionTableName = GravitinoITUtils.genRandomName("range_partition_table"); + LocalDate today = LocalDate.now(); + LocalDate tomorrow = today.plusDays(1); + Literal<LocalDate> todayLiteral = Literals.dateLiteral(today); + Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow); + RangePartition rangePartition1 = Partitions.range("p1", todayLiteral, Literals.NULL, null); + RangePartition rangePartition2 = Partitions.range("p2", tomorrowLiteral, todayLiteral, null); + RangePartition rangePartition3 = Partitions.range("p3", Literals.NULL, tomorrowLiteral, null); + Transform[] rangePartition = + new Transform[] { + Transforms.range( + new String[] {col4.name()}, + new RangePartition[] {rangePartition1, rangePartition2, rangePartition3}) + }; + TABLE_OPERATIONS.create( + databaseName, + rangePartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + rangePartition, + distribution, + indexes); + JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, rangePartitionTableName); + assertionsTableInfo( + rangePartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + new Transform[] {Transforms.range(new String[] {col4.name()})}, + rangePartitionTable); + + // assert partition info + JdbcTablePartitionOperations tablePartitionOperations = + new StarRocksTablePartitionOperations( + DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER, TYPE_CONVERTER); + Map<String, RangePartition> loadedRangePartitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(Partition::name, p -> (RangePartition) p)); + assertTrue(loadedRangePartitions.containsKey("p1")); + RangePartition actualP1 = loadedRangePartitions.get("p1"); + assertEquals(todayLiteral, actualP1.upper()); + assertEquals(Literals.of("0000-01-01", Types.DateType.get()), actualP1.lower()); + assertTrue(loadedRangePartitions.containsKey("p2")); + RangePartition actualP2 = loadedRangePartitions.get("p2"); + assertEquals(tomorrowLiteral, actualP2.upper()); + assertEquals(todayLiteral, actualP2.lower()); + assertTrue(loadedRangePartitions.containsKey("p3")); + RangePartition actualP3 = loadedRangePartitions.get("p3"); + assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), actualP3.upper()); + assertEquals(tomorrowLiteral, actualP3.lower()); + + // create table with list partition + String listPartitionTableName = GravitinoITUtils.genRandomName("list_partition_table"); + Literal<Integer> integerLiteral1 = Literals.integerLiteral(1); + Literal<Integer> integerLiteral2 = Literals.integerLiteral(2); + ListPartition listPartition1 = + Partitions.list( + "p1", + new Literal[][] {{integerLiteral1, todayLiteral}, {integerLiteral1, tomorrowLiteral}}, + null); + ListPartition listPartition2 = + Partitions.list( + "p2", + new Literal[][] {{integerLiteral2, todayLiteral}, {integerLiteral2, tomorrowLiteral}}, + null); + Transform[] listPartition = + new Transform[] { + Transforms.list( + new String[][] {{col1.name()}, {col4.name()}}, + new ListPartition[] {listPartition1, listPartition2}) + }; + TABLE_OPERATIONS.create( + databaseName, + listPartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + listPartition, + distribution, + indexes); + JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, listPartitionTableName); + assertionsTableInfo( + listPartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + new Transform[] {Transforms.list(new String[][] {{col1.name()}, {col4.name()}})}, + listPartitionTable); + + // assert partition info + tablePartitionOperations = + new StarRocksTablePartitionOperations( + DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER, TYPE_CONVERTER); + Map<String, ListPartition> loadedListPartitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(Partition::name, p -> (ListPartition) p, (p1, p2) -> p2)); + assertTrue(loadedListPartitions.containsKey("p1")); + assertTrue(Arrays.deepEquals(listPartition1.lists(), loadedListPartitions.get("p1").lists())); + assertTrue(loadedListPartitions.containsKey("p2")); + assertTrue(Arrays.deepEquals(listPartition2.lists(), loadedListPartitions.get("p2").lists())); + } +} diff --git a/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java new file mode 100644 index 0000000000..1a722f9b98 --- /dev/null +++ b/catalogs/catalog-jdbc-starrocks/src/test/java/org/apache/gravitino/catalog/starrocks/operation/TestStarRocksTablePartitionOperations.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.starrocks.operation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableMap; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations; +import org.apache.gravitino.catalog.starrocks.converter.StarRocksTypeConverter; +import org.apache.gravitino.catalog.starrocks.operations.StarRocksTablePartitionOperations; +import org.apache.gravitino.exceptions.NoSuchPartitionException; +import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.literals.Literal; +import org.apache.gravitino.rel.expressions.literals.Literals; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.partitions.ListPartition; +import org.apache.gravitino.rel.partitions.Partition; +import org.apache.gravitino.rel.partitions.Partitions; +import org.apache.gravitino.rel.partitions.RangePartition; +import org.apache.gravitino.rel.types.Types; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("gravitino-docker-test") +public class TestStarRocksTablePartitionOperations extends TestStarRocks { + private static final String databaseName = GravitinoITUtils.genRandomName("starrocks_test_db"); + private static final Integer DEFAULT_BUCKET_SIZE = 1; + private static final JdbcTypeConverter TYPE_CONVERTER = new StarRocksTypeConverter(); + + @BeforeAll + public static void startup() { + TestStarRocks.startup(); + createDatabase(); + } + + private static void createDatabase() { + DATABASE_OPERATIONS.create(databaseName, "", new HashMap<>()); + } + + private static Map<String, String> createProperties() { + return ImmutableMap.of(); + } + + @Test + public void testRangePartition() { + String tableComment = "range_partitioned_table_comment"; + JdbcColumn col1 = + JdbcColumn.builder() + .withName("col_1") + .withType(Types.IntegerType.get()) + .withNullable(false) + .build(); + JdbcColumn col2 = + JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build(); + JdbcColumn col3 = + JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build(); + JdbcColumn col4 = + JdbcColumn.builder() + .withName("col_4") + .withType(Types.DateType.get()) + .withNullable(false) + .build(); + List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4); + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + String rangePartitionTableName = GravitinoITUtils.genRandomName("range_partition_table"); + Transform[] rangePartition = new Transform[] {Transforms.range(new String[] {col4.name()})}; + TABLE_OPERATIONS.create( + databaseName, + rangePartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + rangePartition, + distribution, + indexes); + + // assert table info + JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, rangePartitionTableName); + assertionsTableInfo( + rangePartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + rangePartition, + rangePartitionTable); + List<String> listTables = TABLE_OPERATIONS.listTables(databaseName); + assertTrue(listTables.contains(rangePartitionTableName)); + + // create Table Partition Operations manually + JdbcTablePartitionOperations tablePartitionOperations = + new StarRocksTablePartitionOperations( + DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER, TYPE_CONVERTER); + + // assert partition info when there is no partitions actually + String[] emptyPartitionNames = tablePartitionOperations.listPartitionNames(); + assertEquals(0, emptyPartitionNames.length); + Partition[] emptyPartitions = tablePartitionOperations.listPartitions(); + assertEquals(0, emptyPartitions.length); + + // get non-existing partition + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p1")); + + // add partition with incorrect type + Partition incorrect = + Partitions.list("test_incorrect", new Literal[][] {{Literals.NULL}}, null); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, () -> tablePartitionOperations.addPartition(incorrect)); + assertEquals( + "Table " + + rangePartitionTableName + + " is non-list-partitioned, but trying to add a list partition", + exception.getMessage()); + + // add different kinds of range partitions + LocalDate today = LocalDate.now(); + LocalDate tomorrow = today.plusDays(1); + Literal<LocalDate> todayLiteral = Literals.dateLiteral(today); + Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow); + Partition p1 = Partitions.range("p1", todayLiteral, Literals.NULL, Collections.emptyMap()); + Partition p2 = Partitions.range("p2", tomorrowLiteral, todayLiteral, Collections.emptyMap()); + Partition p3 = Partitions.range("p3", Literals.NULL, tomorrowLiteral, Collections.emptyMap()); + assertEquals(p1, tablePartitionOperations.addPartition(p1)); + assertEquals(p2, tablePartitionOperations.addPartition(p2)); + assertEquals(p3, tablePartitionOperations.addPartition(p3)); + + // add partition with same name + Partition p4 = Partitions.range("p3", Literals.NULL, Literals.NULL, Collections.emptyMap()); + assertThrows( + PartitionAlreadyExistsException.class, () -> tablePartitionOperations.addPartition(p4)); + + // check partitions + Set<String> partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(3, partitionNames.size()); + assertTrue(partitionNames.contains("p1")); + assertTrue(partitionNames.contains("p2")); + assertTrue(partitionNames.contains("p3")); + + Map<String, RangePartition> partitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(p -> p.name(), p -> (RangePartition) p)); + assertEquals(3, partitions.size()); + RangePartition actualP1 = partitions.get("p1"); + assertEquals(todayLiteral, actualP1.upper()); + assertEquals(Literals.of("0000-01-01", Types.DateType.get()), actualP1.lower()); + RangePartition actualP2 = partitions.get("p2"); + assertEquals(tomorrowLiteral, actualP2.upper()); + assertEquals(todayLiteral, actualP2.lower()); + RangePartition actualP3 = partitions.get("p3"); + assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), actualP3.upper()); + assertEquals(tomorrowLiteral, actualP3.lower()); + + actualP1 = (RangePartition) tablePartitionOperations.getPartition("p1"); + assertEquals(todayLiteral, actualP1.upper()); + assertEquals(Literals.of("0000-01-01", Types.DateType.get()), actualP1.lower()); + actualP2 = (RangePartition) tablePartitionOperations.getPartition("p2"); + assertEquals(tomorrowLiteral, actualP2.upper()); + assertEquals(todayLiteral, actualP2.lower()); + actualP3 = (RangePartition) tablePartitionOperations.getPartition("p3"); + assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), actualP3.upper()); + assertEquals(tomorrowLiteral, actualP3.lower()); + + // drop partition + assertTrue(tablePartitionOperations.dropPartition("p3")); + partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(2, partitionNames.size()); + assertFalse(partitionNames.contains("p3")); + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p3")); + + // drop non-existing partition + assertFalse(tablePartitionOperations.dropPartition("p3")); + } + + @Test + public void testListPartition() { + String tableComment = "list_partitioned_table_comment"; + JdbcColumn col1 = + JdbcColumn.builder() + .withName("col_1") + .withType(Types.IntegerType.get()) + .withNullable(false) + .build(); + JdbcColumn col2 = + JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build(); + JdbcColumn col3 = + JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build(); + JdbcColumn col4 = + JdbcColumn.builder() + .withName("col_4") + .withType(Types.DateType.get()) + .withNullable(false) + .build(); + List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4); + Distribution distribution = + Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + String listPartitionTableName = GravitinoITUtils.genRandomName("list_partition_table"); + Transform[] listPartition = + new Transform[] {Transforms.list(new String[][] {{col1.name()}, {col4.name()}})}; + TABLE_OPERATIONS.create( + databaseName, + listPartitionTableName, + columns.toArray(new JdbcColumn[] {}), + tableComment, + createProperties(), + listPartition, + distribution, + indexes); + + // assert table info + JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, listPartitionTableName); + assertionsTableInfo( + listPartitionTableName, + tableComment, + columns, + Collections.emptyMap(), + null, + listPartition, + listPartitionTable); + List<String> listTables = TABLE_OPERATIONS.listTables(databaseName); + assertTrue(listTables.contains(listPartitionTableName)); + + // create Table Partition Operations manually + JdbcTablePartitionOperations tablePartitionOperations = + new StarRocksTablePartitionOperations( + DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER, TYPE_CONVERTER); + + // assert partition info when there is no partitions actually + String[] emptyPartitionNames = tablePartitionOperations.listPartitionNames(); + assertEquals(0, emptyPartitionNames.length); + Partition[] emptyPartitions = tablePartitionOperations.listPartitions(); + assertEquals(0, emptyPartitions.length); + + // get non-existing partition + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p1")); + + // add partition with incorrect type + Partition incorrectType = + Partitions.range("p1", Literals.NULL, Literals.NULL, Collections.emptyMap()); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> tablePartitionOperations.addPartition(incorrectType)); + assertEquals( + "Table " + + listPartitionTableName + + " is non-range-partitioned, but trying to add a range partition", + exception.getMessage()); + + // add partition with incorrect value + Partition incorrectValue = + Partitions.list("p1", new Literal[][] {{Literals.NULL}}, Collections.emptyMap()); + exception = + assertThrows( + IllegalArgumentException.class, + () -> tablePartitionOperations.addPartition(incorrectValue)); + assertEquals("The number of partitioning columns must be consistent", exception.getMessage()); + + // add different kinds of list partitions + LocalDate today = LocalDate.now(); + LocalDate tomorrow = today.plusDays(1); + Literal<LocalDate> todayLiteral = Literals.dateLiteral(today); + Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow); + Literal[][] p1Values = {{Literals.integerLiteral(1), todayLiteral}}; + Literal[][] p2Values = {{Literals.integerLiteral(2), todayLiteral}}; + Literal[][] p3Values = {{Literals.integerLiteral(1), tomorrowLiteral}}; + Literal[][] p4Values = {{Literals.integerLiteral(2), tomorrowLiteral}}; + Partition p1 = Partitions.list("p1", p1Values, Collections.emptyMap()); + Partition p2 = Partitions.list("p2", p2Values, Collections.emptyMap()); + Partition p3 = Partitions.list("p3", p3Values, Collections.emptyMap()); + Partition p4 = Partitions.list("p4", p4Values, Collections.emptyMap()); + assertEquals(p1, tablePartitionOperations.addPartition(p1)); + assertEquals(p2, tablePartitionOperations.addPartition(p2)); + assertEquals(p3, tablePartitionOperations.addPartition(p3)); + assertEquals(p4, tablePartitionOperations.addPartition(p4)); + + // check partitions + Set<String> partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(4, partitionNames.size()); + assertTrue(partitionNames.contains("p1")); + assertTrue(partitionNames.contains("p2")); + assertTrue(partitionNames.contains("p3")); + assertTrue(partitionNames.contains("p4")); + + Map<String, ListPartition> partitions = + Arrays.stream(tablePartitionOperations.listPartitions()) + .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p)); + assertEquals(4, partitions.size()); + ListPartition actualP1 = partitions.get("p1"); + assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values)); + ListPartition actualP2 = partitions.get("p2"); + assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values)); + ListPartition actualP3 = partitions.get("p3"); + assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values)); + ListPartition actualP4 = partitions.get("p4"); + assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values)); + + actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1"); + assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values)); + actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2"); + assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values)); + actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3"); + assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values)); + actualP4 = (ListPartition) tablePartitionOperations.getPartition("p4"); + assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values)); + + // drop partition + assertTrue(tablePartitionOperations.dropPartition("p3")); + partitionNames = + Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet()); + assertEquals(3, partitionNames.size()); + assertFalse(partitionNames.contains("p3")); + assertThrows(NoSuchPartitionException.class, () -> tablePartitionOperations.getPartition("p3")); + + // drop non-existing partition + assertFalse(tablePartitionOperations.dropPartition("p3")); + } +} diff --git a/core/src/main/java/org/apache/gravitino/Config.java b/core/src/main/java/org/apache/gravitino/Config.java index 8696e42063..3200854db9 100644 --- a/core/src/main/java/org/apache/gravitino/Config.java +++ b/core/src/main/java/org/apache/gravitino/Config.java @@ -198,7 +198,7 @@ public abstract class Config { (k, v) -> { String trimmedK = k.trim(); String trimmedV = v.trim(); - if (!trimmedK.isEmpty() && !trimmedV.isEmpty()) { + if (!trimmedK.isEmpty()) { if (predicate.test(trimmedK)) { configMap.put(trimmedK, trimmedV); } diff --git a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java index b2b9ed4884..17c08ac0e9 100644 --- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java +++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java @@ -29,9 +29,12 @@ public final class ConfigConstants { /** HTTPS Server port, reused by Gravitino server and Iceberg REST server */ public static final String WEBSERVER_HTTPS_PORT = "httpsPort"; - /** The value of messages used to indicate that the configuration is not set. */ + /** The value of messages used to indicate that the configuration is set to an empty value. */ public static final String NOT_BLANK_ERROR_MSG = "The value can't be blank"; + /** The value of messages used to indicate that the configuration is not set. */ + public static final String NOT_NULL_ERROR_MSG = "The value can't be null"; + /** The value of messages used to indicate that the configuration should be a positive number. */ public static final String POSITIVE_NUMBER_ERROR_MSG = "The value must be a positive number"; diff --git a/docs/index.md b/docs/index.md index 1a1bbc236b..3b45c6e871 100644 --- a/docs/index.md +++ b/docs/index.md @@ -85,6 +85,7 @@ Gravitino currently supports the following catalogs: * [**Paimon catalog**](./lakehouse-paimon-catalog.md) * [**PostgreSQL catalog**](./jdbc-postgresql-catalog.md) * [**OceanBase catalog**](./jdbc-oceanbase-catalog.md) +* [**StarRocks catalog**](./jdbc-starrocks-catalog.md) **Fileset catalogs:** diff --git a/docs/jdbc-starrocks-catalog.md b/docs/jdbc-starrocks-catalog.md new file mode 100644 index 0000000000..eecfc7aa6e --- /dev/null +++ b/docs/jdbc-starrocks-catalog.md @@ -0,0 +1,197 @@ +--- +title: "StarRocks catalog" +slug: /jdbc-starrocks-catalog +keywords: +- jdbc +- starrocks +- metadata +license: "This software is licensed under the Apache License version 2." +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +## Introduction + +Apache Gravitino provides the ability to manage [StarRocks](https://www.starrocks.io/) metadata through JDBC connection. + +:::caution +Gravitino saves some system information in table comments, like +`(From Gravitino, DO NOT EDIT: gravitino.v1.uid1078334182909406185)`, please don't change or remove this message. +::: + +## Catalog + +### Catalog capabilities + +- Gravitino catalog corresponds to the StarRocks instance. +- Supports metadata management of StarRocks (3.3.x). +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). + +### Catalog properties + +You can pass to a StarRocks data source any property that isn't defined by Gravitino by adding +`gravitino.bypass.` prefix as a catalog property. For example, catalog property +`gravitino.bypass.maxWaitMillis` will pass `maxWaitMillis` to the data source property. + +You can check the relevant data source configuration in +[data source properties](https://commons.apache.org/proper/commons-dbcp/configuration.html) for +more details. + +Besides the [common catalog properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration), the StarRocks catalog has the following properties: + +| Configuration item | Description | Default value | Required | Since Version | +|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|------------------| +| `jdbc-url` | JDBC URL for connecting to the database. For example, `jdbc:mysql://localhost:9030` | (none) | Yes | 1.0.0 | +| `jdbc-driver` | The driver of the JDBC connection. For example, `com.mysql.jdbc.Driver`. | (none) | Yes | 1.0.0 | +| `jdbc-user` | The JDBC user name. | (none) | Yes | 1.0.0 | +| `jdbc-password` | The JDBC password. | (none) | Yes | 1.0.0 | +| `jdbc.pool.min-size` | The minimum number of connections in the pool. `2` by default. | `2` | No | 1.0.0 | +| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` by default. | `10` | No | 1.0.0 | +| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` by default. | `10` | No | 1.0.0 | + + +Before using the StarRocks Catalog, you must download the corresponding JDBC driver to the `catalogs/jdbc-starrocks/libs` directory. +Gravitino doesn't package the JDBC driver for StarRocks due to licensing issues. + +### Driver Version Compatibility + +The StarRocks catalog includes driver version compatibility checks for datetime precision calculation: + +- **MySQL Connector/J versions >= 8.0.16**: Full support for datetime precision calculation +- **MySQL Connector/J versions < 8.0.16**: Limited support - datetime precision calculation returns `null` with a warning log + +This limitation affects the following datetime types: +- `DATETIME(p)` - datetime precision + +When using an unsupported driver version, the system will: +1. Continue to work normally with default precision (0) +2. Log a warning message indicating the driver version limitation +3. Return `null` for precision calculations to avoid incorrect results + +**Example warning log:** +``` +WARN: MySQL driver version mysql-connector-java-8.0.11 is below 8.0.16, +columnSize may not be accurate for precision calculation. +Returning null for DATETIME type precision. Driver version: mysql-connector-java-8.0.11 +``` + +**Recommended driver versions:** +- `mysql-connector-java-8.0.16` or higher + +### Catalog operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details. + +## Schema + +### Schema capabilities + +- Gravitino's schema concept corresponds to the StarRocks database. +- Supports creating schema. +- Supports dropping schema. + +### Schema properties + +As StarRocks can't get thr properties after set, So now we do not support set Schema properties. + +### Schema operations + +Please refer to +[Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#schema-operations) for more details. + +## Table + +### Table capabilities + +- Gravitino's table concept corresponds to the StarRocks table. +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). + +#### Table column types + +| Gravitino Type | StarRocks Type | +|----------------|----------------| +| `Boolean` | `Boolean` | +| `Byte` | `TinyInt` | +| `Short` | `SmallInt` | +| `Integer` | `Int` | +| `Long` | `BigInt` | +| `Float` | `Float` | +| `Double` | `Double` | +| `Decimal` | `Decimal` | +| `Date` | `Date` | +| `Timestamp` | `Datetime` | +| `VarChar` | `VarChar` | +| `FixedChar` | `Char` | +| `String` | `String` | +| `Binary` | `Binary` | + + +StarRocks doesn't support Gravitino `Fixed` `Timestamp_tz` `IntervalDay` `IntervalYear` `Union` `UUID` type. +The data types other than those listed above are mapped to Gravitino's **[Unparsed Type](./manage-relational-metadata-using-gravitino.md#unparsed-type)** that represents an unresolvable data type since 1.0.0. + +:::note +Gravitino can not load StarRocks `array`, `map` and `struct` type correctly, because StarRocks doesn't support these types in JDBC. +::: + + +### Table column auto-increment + +Unsupported for now. + +### Table properties + +- StarRocks supports table properties, and you can set them in the table properties. +- Only supports StarRocks table properties and doesn't support user-defined properties. + +### Table indexes + +Unsupported + +### Table partitioning + +The StarRocks catalog supports partitioned tables. +Users can create partitioned tables in the StarRocks catalog with specific partitioning attributes. It is also supported to pre-assign partitions when creating StarRocks tables. +Note that although Gravitino supports several partitioning strategies, StarRocks inherently only supports these two partitioning strategies: + +- `RANGE` +- `LIST` + +:::caution +The `fieldName` specified in the partitioning attributes must be the name of columns defined in the table. +::: + +### Table distribution + +Users can also specify the distribution strategy when creating tables in the StarRocks catalog. Currently, the StarRocks catalog supports the following distribution strategies: +- `HASH` +- `RANDOM` + +For the `RANDOM` distribution strategy, Gravitino uses the `EVEN` to represent it. More information about the distribution strategy defined in Gravitino can be found [here](./table-partitioning-distribution-sort-order-indexes.md#table-distribution). + + +### Table operations + +Please refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) for more details. + +#### Alter table operations + +Gravitino supports these table alteration operations: + +- `RenameTable` +- `UpdateComment` +- `AddColumn` +- `DeleteColumn` +- `UpdateColumnType` +- `UpdateColumnPosition` +- `SetProperty` + +Please be aware that: + + - Not all table alteration operations can be processed in batches. + - Schema changes, such as adding/modifying/dropping columns can be processed in batches. + - The schema alteration in StarRocks is asynchronous. You might get an outdated schema if you + execute a schema query immediately after the alteration. It is recommended to pause briefly + after the schema alteration. Gravitino will add the schema alteration status into + the schema information in the upcoming version to solve this problem. +- StarRocks has limited support for [alert table properties](https://docs.starrocks.io/docs/3.3/sql-reference/sql-statements/table_bucket_part_index/ALTER_TABLE/#modify-table-properties), And it suggests modify one property at a time. \ No newline at end of file diff --git a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java index d2a5ee6152..0127752068 100644 --- a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java +++ b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java @@ -66,7 +66,7 @@ public class ContainerSuite implements Closeable { private static volatile DorisContainer dorisContainer; private static volatile HiveContainer kerberosHiveContainer; private static volatile HiveContainer sqlBaseHiveContainer; - + private static volatile StarRocksContainer starRocksContainer; private static volatile MySQLContainer mySQLContainer; private static volatile MySQLContainer mySQLVersion5Container; private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap = @@ -480,6 +480,27 @@ public class ContainerSuite implements Closeable { } } + public void startStarRocksContainer() { + if (starRocksContainer == null) { + synchronized (ContainerSuite.class) { + if (starRocksContainer == null) { + initIfNecessary(); + // Start StarRocks container + StarRocksContainer.Builder starRocksBuilder = + StarRocksContainer.builder().withNetwork(network); + StarRocksContainer container = closer.register(starRocksBuilder.build()); + try { + container.start(); + } catch (Exception e) { + LOG.error("Failed to start StarRocks container", e); + throw new RuntimeException("Failed to start StarRocks container", e); + } + starRocksContainer = container; + } + } + } + } + public GravitinoLocalStackContainer getLocalStackContainer() { return gravitinoLocalStackContainer; } @@ -556,6 +577,10 @@ public class ContainerSuite implements Closeable { return mySQLVersion5Container; } + public StarRocksContainer getStarRocksContainer() { + return starRocksContainer; + } + public PostgreSQLContainer getPostgreSQLContainer() throws NoSuchElementException { return getPostgreSQLContainer(PGImageName.VERSION_13); } diff --git a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java new file mode 100644 index 0000000000..99be7a0c6b --- /dev/null +++ b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/StarRocksContainer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.integration.test.container; + +import static java.lang.String.format; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +import com.google.common.collect.ImmutableSet; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.rnorth.ducttape.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +public class StarRocksContainer extends BaseContainer { + public static final Logger LOG = LoggerFactory.getLogger(StarRocksContainer.class); + + public static final String DEFAULT_IMAGE = "starrocks/allin1-ubuntu:3.3-latest"; + public static final String HOST_NAME = "gravitino-ci-starrocks"; + public static final String USER_NAME = "root"; + public static final String PASSWORD = ""; + public static final int FE_HTTP_PORT = 8030; + public static final int FE_MYSQL_PORT = 9030; + + public static Builder builder() { + return new Builder(); + } + + protected StarRocksContainer( + String image, + String hostName, + Set<Integer> ports, + Map<String, String> extraHosts, + Map<String, String> filesToMount, + Map<String, String> envVars, + Optional<Network> network) { + super(image, hostName, ports, extraHosts, filesToMount, envVars, network); + } + + @Override + protected void setupContainer() { + super.setupContainer(); + withLogConsumer(new PrintingContainerLog(format("%-14s| ", "StarRocksContainer"))); + withStartupTimeout(Duration.ofMinutes(5)); + } + + @Override + public void start() { + super.start(); + Preconditions.check("StarRocks container startup failed!", checkContainerStatus(5)); + } + + @Override + public void close() { + super.close(); + } + + @Override + protected boolean checkContainerStatus(int retryLimit) { + String starRocksJdbcUrl = format("jdbc:mysql://%s:%d/", getContainerIpAddress(), FE_MYSQL_PORT); + LOG.info("StarRocks url is " + starRocksJdbcUrl); + + await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(30 / retryLimit, TimeUnit.SECONDS) + .until( + () -> { + try (Connection connection = + DriverManager.getConnection(starRocksJdbcUrl, USER_NAME, ""); + Statement statement = connection.createStatement()) { + + // execute `SHOW PROC '/backends';` to check if backends is ready + String query = "SHOW PROC '/backends';"; + try (ResultSet resultSet = statement.executeQuery(query)) { + while (resultSet.next()) { + String alive = resultSet.getString("Alive"); + String totalCapacity = resultSet.getString("TotalCapacity"); + float totalCapacityFloat = Float.parseFloat(totalCapacity.split(" ")[0]); + + // alive should be true and totalCapacity should not be 0.000 + if (alive.equalsIgnoreCase("true") && totalCapacityFloat > 0.0f) { + LOG.info("StarRocks container startup success!"); + return true; + } + } + } + LOG.info("StarRocks container is not ready yet!"); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + return false; + }); + + return true; + } + + public static class Builder + extends BaseContainer.Builder<StarRocksContainer.Builder, StarRocksContainer> { + private Builder() { + this.image = DEFAULT_IMAGE; + this.hostName = HOST_NAME; + this.exposePorts = ImmutableSet.of(FE_HTTP_PORT, FE_MYSQL_PORT); + } + + @Override + public StarRocksContainer build() { + return new StarRocksContainer( + image, hostName, exposePorts, extraHosts, filesToMount, envVars, network); + } + } +}