Copilot commented on code in PR #9878:
URL: https://github.com/apache/gravitino/pull/9878#discussion_r2764622933
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +126,920 @@ protected String generateCreateTableSql(
Distribution distribution,
Index[] indexes) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateCreateTableSql is not implemented
yet.");
+ "generateCreateTableSql with out sortOrders in clickhouse is not
supported");
+ }
+
+ @Override
+ protected String generateCreateTableSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ SortOrder[] sortOrders) {
+
+ Preconditions.checkArgument(
+ Distributions.NONE.equals(distribution), "ClickHouse does not support
distribution");
+
+ StringBuilder sqlBuilder = new StringBuilder();
+
+ Map<String, String> notNullProperties =
+ MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
+
+ // Add Create table clause
+ boolean onCluster = appendCreateTableClause(notNullProperties, sqlBuilder,
tableName);
+
+ // Add columns
+ buildColumnsDefinition(columns, sqlBuilder);
+
+ // Index definition
+ appendIndexesSql(indexes, sqlBuilder);
+
+ sqlBuilder.append("\n)");
+
+ // Extract engine from properties
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
+ appendTableEngine(notNullProperties, sqlBuilder, onCluster);
+
+ appendOrderBy(sortOrders, sqlBuilder, engine);
+
+ appendPartitionClause(partitioning, sqlBuilder, engine);
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+ }
+
+ // Add setting clause if specified, clickhouse only supports predefine
settings
+ appendTableProperties(notNullProperties, sqlBuilder);
+
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(CLUSTER_NAME);
+ String onClusterValue = properties.get(ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ &&
Boolean.TRUE.equals(BooleanUtils.toBooleanObject(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER `%s`
(\n".formatted(quoteIdentifier(tableName), clusterName));
+ } else {
+ sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+ }
+
+ return onCluster;
+ }
+
+ private static void appendTableProperties(
+ Map<String, String> properties, StringBuilder sqlBuilder) {
+ if (MapUtils.isEmpty(properties)) {
+ return;
+ }
+
+ Map<String, String> settingMap =
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if (MapUtils.isEmpty(settingMap)) {
+ return;
+ }
+
+ String settings =
+ settingMap.entrySet().stream()
+ .map(
+ entry ->
+ entry.getKey().substring(SETTINGS_PREFIX.length()) + " = "
+ entry.getValue())
+ .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+ sqlBuilder.append(settings);
+ }
+
+ private static void appendOrderBy(
+ SortOrder[] sortOrders,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ // ClickHouse requires ORDER BY clause for some engines, and currently
only mergeTree family
+ // requires ORDER BY clause.
+ boolean requireOrderBy = engine.isRequireOrderBy();
+ if (!requireOrderBy) {
+ if (ArrayUtils.isNotEmpty(sortOrders)) {
+ throw new UnsupportedOperationException(
+ "ORDER BY clause is not supported for engine: " +
engine.getValue());
+ }
+
+ // No need to add order by clause
+ return;
+ }
+
+ if (ArrayUtils.isEmpty(sortOrders)) {
+ throw new IllegalArgumentException(
+ "ORDER BY clause is required for engine: " + engine.getValue());
+ }
+
+ if (sortOrders.length > 1) {
+ throw new UnsupportedOperationException(
+ "Currently ClickHouse does not support sortOrders with more than 1
element");
+ }
+
+ NullOrdering nullOrdering = sortOrders[0].nullOrdering();
+ SortDirection sortDirection = sortOrders[0].direction();
+ if (nullOrdering != null && sortDirection != null) {
+ // ClickHouse does not support NULLS FIRST/LAST now.
+ LOG.warn(
+ "ClickHouse currently does not support nullOrdering: {}, and will
ignore it",
+ nullOrdering);
+ }
+
+ sqlBuilder.append("\n ORDER BY
`%s`\n".formatted(sortOrders[0].expression()));
+ }
+
+ private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
+ if (MapUtils.isNotEmpty(properties)) {
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
+ if (StringUtils.isNotEmpty(userSetEngine)) {
+ engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
+ }
+ }
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(CLUSTER_NAME);
+ String remoteDatabase = properties.get(CLUSTER_REMOTE_DATABASE);
+ String remoteTable = properties.get(CLUSTER_REMOTE_TABLE);
+ String shardingKey = properties.get(CLUSTER_SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
+ // Now check if engine is distributed, we need to check the remote
database and table properties
+
+ sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
+ return engine;
+ }
+
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
+ return quoteIdentifier(fieldName);
+ }
+
+ private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder
sqlBuilder) {
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(" %s".formatted(quoteIdentifier(column.name())));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(",\n");
+ }
+ }
+ }
+
+ /**
+ * ClickHouse supports primary key and data skipping indexes.
+ *
+ * <p>This method will not check the validity of the indexes. For
ClickHouse, the primary key must
+ * be a subset of the order by columns. We will leave the underlying
clickhouse to validate it.
+ */
+ private void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+ if (ArrayUtils.isEmpty(indexes)) {
+ return;
+ }
+
+ for (Index index : indexes) {
+ String fieldStr = getIndexFieldStr(index.fieldNames());
+ sqlBuilder.append(",\n");
+ switch (index.type()) {
+ case PRIMARY_KEY:
+ if (null != index.name()
+ && !StringUtils.equalsIgnoreCase(index.name(),
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
+ LOG.warn(
+ "Primary key name must be PRIMARY in ClickHouse, the name {}
will be ignored.",
+ index.name());
+ }
+ // fieldStr already quoted in getIndexFieldStr
+ sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+ break;
+ case DATA_SKIPPING_MINMAX:
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ // The GRANULARITY value is always 1 here currently.
+ sqlBuilder.append(
+ " INDEX %s %s TYPE minmax GRANULARITY 1"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ case DATA_SKIPPING_BLOOM_FILTER:
+ // The GRANULARITY value is always 3 here currently.
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ sqlBuilder.append(
+ " INDEX %s %s TYPE bloom_filter GRANULARITY 3"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Gravitino Clickhouse doesn't support index : " + index.type());
+ }
+ }
+ }
+
+ @Override
+ protected boolean getAutoIncrementInfo(ResultSet resultSet) throws
SQLException {
+ return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+ }
+
+ @Override
+ public void alterTable(String databaseName, String tableName, TableChange...
changes)
+ throws NoSuchTableException {
+ LOG.info("Attempting to alter table {} from database {}", tableName,
databaseName);
+ try (Connection connection = getConnection(databaseName)) {
+ for (TableChange change : changes) {
+ String sql = generateAlterTableSql(databaseName, tableName, change);
+ if (StringUtils.isEmpty(sql)) {
+ LOG.info("No changes to alter table {} from database {}", tableName,
databaseName);
+ return;
+ }
+ JdbcConnectorUtils.executeUpdate(connection, sql);
+ }
+ LOG.info("Alter table {} from database {}", tableName, databaseName);
+ } catch (final SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
+ }
+ }
+
+ @Override
+ protected Map<String, String> getTableProperties(Connection connection,
String tableName)
+ throws SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement("select * from system.tables where name =
? ")) {
+ statement.setString(1, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ if (Objects.equals(name, tableName)) {
+ return Collections.unmodifiableMap(
+ new HashMap<String, String>() {
+ {
+ put(COMMENT, resultSet.getString(COMMENT));
+ put(GRAVITINO_ENGINE_KEY,
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+ }
+ });
+ }
+ }
+
+ throw new NoSuchTableException(
+ "Table %s does not exist in %s.", tableName,
connection.getCatalog());
+ }
+ }
+ }
+
+ @Override
+ protected Transform[] getTablePartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement(
+ "SELECT partition_key FROM system.tables WHERE database = ? AND
name = ?")) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ String partitionKey = resultSet.getString("partition_key");
+ try {
+ return parsePartitioning(partitionKey);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported partition expression {} for {}.{}",
+ partitionKey,
+ databaseName,
+ tableName);
+ return Transforms.EMPTY_TRANSFORM;
+ }
+ }
+ }
+ }
+
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ protected ResultSet getTables(Connection connection) throws SQLException {
+ final DatabaseMetaData metaData = connection.getMetaData();
+ String catalogName = connection.getCatalog();
+ String schemaName = connection.getSchema();
+ // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+ // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+ return metaData.getTables(catalogName, schemaName, null, new String[]
{"TABLE"});
}
@Override
protected String generatePurgeTableSql(String tableName) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generatePurgeTableSql is not implemented
yet.");
+ "ClickHouse does not support purge table in Gravitino, please use drop
table");
}
@Override
protected String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes) {
- throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateAlterTableSql is not implemented
yet.");
+ // Not all operations require the original table information, so lazy
loading is used here
+ JdbcTable lazyLoadTable = null;
+ TableChange.UpdateComment updateComment = null;
+ List<TableChange.SetProperty> setProperties = new ArrayList<>();
+ List<String> alterSql = new ArrayList<>();
+
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.UpdateComment) {
+ updateComment = (TableChange.UpdateComment) change;
+
+ } else if (change instanceof TableChange.SetProperty setProperty) {
+ // The set attribute needs to be added at the end.
+ setProperties.add(setProperty);
+
+ } else if (change instanceof TableChange.RemoveProperty) {
+ // Clickhouse does not support deleting table attributes, it can be
replaced by Set Property
+ throw new IllegalArgumentException("Remove property is not supported
yet");
+
+ } else if (change instanceof TableChange.AddColumn addColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(addColumnFieldDefinition(addColumn));
+
+ } else if (change instanceof TableChange.RenameColumn renameColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+ } else if (change instanceof TableChange.UpdateColumnDefaultValue
updateColumnDefaultValue) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnType
updateColumnType) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnTypeFieldDefinition(updateColumnType,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnComment
updateColumnComment) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnPosition
updateColumnPosition) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ String deleteColSql = deleteColumnFieldDefinition(deleteColumn,
lazyLoadTable);
+
+ if (StringUtils.isNotEmpty(deleteColSql)) {
+ alterSql.add(deleteColSql);
+ }
+
+ } else if (change instanceof TableChange.UpdateColumnNullability) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnNullabilityDefinition(
+ (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+ } else if (change instanceof TableChange.DeleteIndex) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(deleteIndexDefinition(lazyLoadTable,
(TableChange.DeleteIndex) change));
+
+ } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnAutoIncrementDefinition(
+ lazyLoadTable, (TableChange.UpdateColumnAutoIncrement)
change));
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change type: " + change.getClass().getName());
+ }
+ }
+
+ if (!setProperties.isEmpty()) {
+ alterSql.add(generateAlterTableProperties(setProperties));
+ }
+
+ // Last modified comment
+ if (null != updateComment) {
+ String newComment = updateComment.getNewComment();
+ if (null == StringIdentifier.fromComment(newComment)) {
+ // Detect and add Gravitino id.
+ JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ StringIdentifier identifier =
StringIdentifier.fromComment(jdbcTable.comment());
+ if (null != identifier) {
+ newComment = StringIdentifier.addToComment(identifier, newComment);
+ }
+ }
+ alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
Review Comment:
Table comment alteration builds `MODIFY COMMENT '%s'` without escaping
single quotes in `newComment`, which will break SQL and can be exploited if
comment is user-controlled. Escape quotes consistently (e.g., replace `'` with
`''`) before embedding, or use a safe parameterized approach if supported.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -18,16 +18,104 @@
*/
package org.apache.gravitino.catalog.clickhouse.operations;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_NAME;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_REMOTE_DATABASE;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_REMOTE_TABLE;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.CLUSTER_SHARDING_KEY;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ON_CLUSTER;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.SETTINGS_PREFIX;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.StringIdentifier;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE;
import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.NullOrdering;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
public class ClickHouseTableOperations extends JdbcTableOperations {
+ private static final String CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG =
+ "Clickhouse does not support nested column names.";
+
+ private static final String QUERY_INDEXES_SQL =
+ """
+ SELECT NULL AS TABLE_CAT,
+ system.tables.database AS TABLE_SCHEM,
+ system.tables.name AS TABLE_NAME,
+ trim(c.1) AS COLUMN_NAME,
+ c.2 AS KEY_SEQ,
+ 'PRIMARY' AS PK_NAME
+ FROM system.tables
+ ARRAY JOIN arrayZip(splitByChar(',', primary_key),
arrayEnumerate(splitByChar(',', primary_key))) as c
+ WHERE system.tables.primary_key <> ''
+ AND system.tables.database = '%s'
+ AND system.tables.name = '%s'
+ ORDER BY COLUMN_NAME
Review Comment:
`QUERY_INDEXES_SQL` interpolates `databaseName`/`tableName` directly into
the SQL string (via `%s`), which bypasses prepared-statement parameterization
and can allow SQL injection / broken queries when names contain quotes. Use
`... database = ? AND name = ?` and bind parameters (or otherwise safely escape
identifiers/values).
##########
catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java:
##########
@@ -69,6 +69,10 @@ public void create(String databaseName, String comment,
Map<String, String> prop
}
try (final Connection connection = getConnection()) {
+ if (connection.getCatalog().equals(databaseName)) {
+ connection.setCatalog(createSysDatabaseNameSet().iterator().next());
+ }
Review Comment:
`connection.getCatalog().equals(databaseName)` can throw NPE if
`getCatalog()` returns null (valid for some JDBC drivers). Use
`Objects.equals(connection.getCatalog(), databaseName)` or a null check before
calling `.equals`.
##########
integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.awaitility.Awaitility;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+ public static final Logger LOG =
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+ public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+ public static final String HOST_NAME = "gravitino-ci-clickhouse";
+ public static final int CLICKHOUSE_PORT = 8123;
+ public static final String USER_NAME = "default";
+ public static final String PASSWORD = "default";
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ protected ClickHouseContainer(
+ String image,
+ String hostName,
+ Set<Integer> ports,
+ Map<String, String> extraHosts,
+ Map<String, String> filesToMount,
+ Map<String, String> envVars,
+ Optional<Network> network) {
+ super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+ }
+
+ @Override
+ protected void setupContainer() {
+ super.setupContainer();
+ withLogConsumer(new PrintingContainerLog(format("%-14s| ",
"clickHouseContainer")));
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ Preconditions.check("clickHouse container startup failed!",
checkContainerStatus(5));
+ }
+
+ @Override
+ protected boolean checkContainerStatus(int retryLimit) {
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(2))
+ .pollInterval(Duration.ofSeconds(5))
+ .until(
+ () -> {
+ try (Connection connection =
+ DriverManager.getConnection(getJdbcUrl(), USER_NAME,
getPassword());
+ Statement statement = connection.createStatement()) {
+ // Simple health check query; ClickHouse should respond if it
is ready.
+ statement.execute("SELECT 1");
+ LOG.info("clickHouse container is healthy");
+ return true;
+ } catch (SQLException e) {
+ LOG.warn(
+ "Failed to connect to clickHouse container during
Awaitility health check: {}",
+ e.getMessage(),
+ e);
+ return false;
+ }
+ });
+
+ return true;
+ }
+
+ public void createDatabase(TestDatabaseName testDatabaseName) {
+ String clickHouseJdbcUrl =
+ StringUtils.substring(
+ getJdbcUrl(testDatabaseName), 0,
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+ // Use the default username and password to connect and create the
database;
+ // any non-default password must be configured via Gravitino catalog
properties.
+ try (Connection connection =
+ DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME,
getPassword());
+ Statement statement = connection.createStatement()) {
+
+ String query = String.format("CREATE DATABASE IF NOT EXISTS %s;",
testDatabaseName);
+ // FIXME: String, which is used in SQL, can be unsafe
+ statement.execute(query);
+ LOG.info(
Review Comment:
`createDatabase` builds SQL with an unquoted database name and leaves a
`FIXME` about SQL safety. Even in tests, prefer quoting/validating identifiers
(e.g., backticks + escaping) and remove `FIXME` (or replace with an issue
reference) to avoid leaving unresolved unsafe patterns.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +126,920 @@ protected String generateCreateTableSql(
Distribution distribution,
Index[] indexes) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateCreateTableSql is not implemented
yet.");
+ "generateCreateTableSql with out sortOrders in clickhouse is not
supported");
+ }
+
+ @Override
+ protected String generateCreateTableSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ SortOrder[] sortOrders) {
+
+ Preconditions.checkArgument(
+ Distributions.NONE.equals(distribution), "ClickHouse does not support
distribution");
+
+ StringBuilder sqlBuilder = new StringBuilder();
+
+ Map<String, String> notNullProperties =
+ MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
+
+ // Add Create table clause
+ boolean onCluster = appendCreateTableClause(notNullProperties, sqlBuilder,
tableName);
+
+ // Add columns
+ buildColumnsDefinition(columns, sqlBuilder);
+
+ // Index definition
+ appendIndexesSql(indexes, sqlBuilder);
+
+ sqlBuilder.append("\n)");
+
+ // Extract engine from properties
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
+ appendTableEngine(notNullProperties, sqlBuilder, onCluster);
+
+ appendOrderBy(sortOrders, sqlBuilder, engine);
+
+ appendPartitionClause(partitioning, sqlBuilder, engine);
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+ }
+
+ // Add setting clause if specified, clickhouse only supports predefine
settings
+ appendTableProperties(notNullProperties, sqlBuilder);
+
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(CLUSTER_NAME);
+ String onClusterValue = properties.get(ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ &&
Boolean.TRUE.equals(BooleanUtils.toBooleanObject(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER `%s`
(\n".formatted(quoteIdentifier(tableName), clusterName));
+ } else {
+ sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+ }
+
+ return onCluster;
+ }
+
+ private static void appendTableProperties(
+ Map<String, String> properties, StringBuilder sqlBuilder) {
+ if (MapUtils.isEmpty(properties)) {
+ return;
+ }
+
+ Map<String, String> settingMap =
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if (MapUtils.isEmpty(settingMap)) {
+ return;
+ }
+
+ String settings =
+ settingMap.entrySet().stream()
+ .map(
+ entry ->
+ entry.getKey().substring(SETTINGS_PREFIX.length()) + " = "
+ entry.getValue())
+ .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+ sqlBuilder.append(settings);
+ }
+
+ private static void appendOrderBy(
+ SortOrder[] sortOrders,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ // ClickHouse requires ORDER BY clause for some engines, and currently
only mergeTree family
+ // requires ORDER BY clause.
+ boolean requireOrderBy = engine.isRequireOrderBy();
+ if (!requireOrderBy) {
+ if (ArrayUtils.isNotEmpty(sortOrders)) {
+ throw new UnsupportedOperationException(
+ "ORDER BY clause is not supported for engine: " +
engine.getValue());
+ }
+
+ // No need to add order by clause
+ return;
+ }
+
+ if (ArrayUtils.isEmpty(sortOrders)) {
+ throw new IllegalArgumentException(
+ "ORDER BY clause is required for engine: " + engine.getValue());
+ }
+
+ if (sortOrders.length > 1) {
+ throw new UnsupportedOperationException(
+ "Currently ClickHouse does not support sortOrders with more than 1
element");
+ }
+
+ NullOrdering nullOrdering = sortOrders[0].nullOrdering();
+ SortDirection sortDirection = sortOrders[0].direction();
+ if (nullOrdering != null && sortDirection != null) {
+ // ClickHouse does not support NULLS FIRST/LAST now.
+ LOG.warn(
+ "ClickHouse currently does not support nullOrdering: {}, and will
ignore it",
+ nullOrdering);
+ }
+
+ sqlBuilder.append("\n ORDER BY
`%s`\n".formatted(sortOrders[0].expression()));
+ }
+
+ private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
+ if (MapUtils.isNotEmpty(properties)) {
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
+ if (StringUtils.isNotEmpty(userSetEngine)) {
+ engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
+ }
+ }
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(CLUSTER_NAME);
+ String remoteDatabase = properties.get(CLUSTER_REMOTE_DATABASE);
+ String remoteTable = properties.get(CLUSTER_REMOTE_TABLE);
+ String shardingKey = properties.get(CLUSTER_SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
+ // Now check if engine is distributed, we need to check the remote
database and table properties
+
+ sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
+ return engine;
+ }
+
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
+ return quoteIdentifier(fieldName);
+ }
+
+ private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder
sqlBuilder) {
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(" %s".formatted(quoteIdentifier(column.name())));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(",\n");
+ }
+ }
+ }
+
+ /**
+ * ClickHouse supports primary key and data skipping indexes.
+ *
+ * <p>This method will not check the validity of the indexes. For
ClickHouse, the primary key must
+ * be a subset of the order by columns. We will leave the underlying
clickhouse to validate it.
+ */
+ private void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+ if (ArrayUtils.isEmpty(indexes)) {
+ return;
+ }
+
+ for (Index index : indexes) {
+ String fieldStr = getIndexFieldStr(index.fieldNames());
+ sqlBuilder.append(",\n");
+ switch (index.type()) {
+ case PRIMARY_KEY:
+ if (null != index.name()
+ && !StringUtils.equalsIgnoreCase(index.name(),
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
+ LOG.warn(
+ "Primary key name must be PRIMARY in ClickHouse, the name {}
will be ignored.",
+ index.name());
+ }
+ // fieldStr already quoted in getIndexFieldStr
+ sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+ break;
+ case DATA_SKIPPING_MINMAX:
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ // The GRANULARITY value is always 1 here currently.
+ sqlBuilder.append(
+ " INDEX %s %s TYPE minmax GRANULARITY 1"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ case DATA_SKIPPING_BLOOM_FILTER:
+ // The GRANULARITY value is always 3 here currently.
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ sqlBuilder.append(
+ " INDEX %s %s TYPE bloom_filter GRANULARITY 3"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Gravitino Clickhouse doesn't support index : " + index.type());
+ }
+ }
+ }
+
+ @Override
+ protected boolean getAutoIncrementInfo(ResultSet resultSet) throws
SQLException {
+ return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+ }
+
+ @Override
+ public void alterTable(String databaseName, String tableName, TableChange...
changes)
+ throws NoSuchTableException {
+ LOG.info("Attempting to alter table {} from database {}", tableName,
databaseName);
+ try (Connection connection = getConnection(databaseName)) {
+ for (TableChange change : changes) {
+ String sql = generateAlterTableSql(databaseName, tableName, change);
+ if (StringUtils.isEmpty(sql)) {
+ LOG.info("No changes to alter table {} from database {}", tableName,
databaseName);
+ return;
+ }
+ JdbcConnectorUtils.executeUpdate(connection, sql);
+ }
+ LOG.info("Alter table {} from database {}", tableName, databaseName);
+ } catch (final SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
+ }
+ }
+
+ @Override
+ protected Map<String, String> getTableProperties(Connection connection,
String tableName)
+ throws SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement("select * from system.tables where name =
? ")) {
+ statement.setString(1, tableName);
Review Comment:
`getTableProperties` queries `system.tables` by `name` only; if multiple
databases contain the same table name, this can return properties for the wrong
table. Add a `database = ?` predicate (using `connection.getCatalog()` /
current DB) and bind it along with `name`.
```suggestion
connection.prepareStatement(
"select * from system.tables where database = ? and name = ? "))
{
String databaseName = connection.getCatalog();
statement.setString(1, databaseName);
statement.setString(2, tableName);
```
##########
integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ClickHouseContainer.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.awaitility.Awaitility;
+import org.rnorth.ducttape.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+public class ClickHouseContainer extends BaseContainer {
+ public static final Logger LOG =
LoggerFactory.getLogger(ClickHouseContainer.class);
+
+ public static final String DEFAULT_IMAGE = "clickhouse:24.8.14";
+ public static final String HOST_NAME = "gravitino-ci-clickhouse";
+ public static final int CLICKHOUSE_PORT = 8123;
+ public static final String USER_NAME = "default";
+ public static final String PASSWORD = "default";
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ protected ClickHouseContainer(
+ String image,
+ String hostName,
+ Set<Integer> ports,
+ Map<String, String> extraHosts,
+ Map<String, String> filesToMount,
+ Map<String, String> envVars,
+ Optional<Network> network) {
+ super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+ }
+
+ @Override
+ protected void setupContainer() {
+ super.setupContainer();
+ withLogConsumer(new PrintingContainerLog(format("%-14s| ",
"clickHouseContainer")));
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ Preconditions.check("clickHouse container startup failed!",
checkContainerStatus(5));
+ }
+
+ @Override
+ protected boolean checkContainerStatus(int retryLimit) {
+ Awaitility.await()
+ .atMost(java.time.Duration.ofMinutes(2))
+ .pollInterval(Duration.ofSeconds(5))
+ .until(
+ () -> {
+ try (Connection connection =
+ DriverManager.getConnection(getJdbcUrl(), USER_NAME,
getPassword());
+ Statement statement = connection.createStatement()) {
+ // Simple health check query; ClickHouse should respond if it
is ready.
+ statement.execute("SELECT 1");
+ LOG.info("clickHouse container is healthy");
+ return true;
+ } catch (SQLException e) {
+ LOG.warn(
+ "Failed to connect to clickHouse container during
Awaitility health check: {}",
+ e.getMessage(),
+ e);
+ return false;
+ }
+ });
+
+ return true;
+ }
+
+ public void createDatabase(TestDatabaseName testDatabaseName) {
+ String clickHouseJdbcUrl =
+ StringUtils.substring(
+ getJdbcUrl(testDatabaseName), 0,
getJdbcUrl(testDatabaseName).lastIndexOf("/"));
+
+ // Use the default username and password to connect and create the
database;
+ // any non-default password must be configured via Gravitino catalog
properties.
+ try (Connection connection =
+ DriverManager.getConnection(clickHouseJdbcUrl, USER_NAME,
getPassword());
+ Statement statement = connection.createStatement()) {
+
+ String query = String.format("CREATE DATABASE IF NOT EXISTS %s;",
testDatabaseName);
+ // FIXME: String, which is used in SQL, can be unsafe
+ statement.execute(query);
+ LOG.info(
+ String.format("clickHouse container database %s has been created",
testDatabaseName));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
Review Comment:
The `catch (Exception e)` in `createDatabase` logs and swallows failures,
which can hide infrastructure/test setup problems and lead to confusing
downstream errors. Catch the expected exception type(s) (e.g., `SQLException`)
and rethrow a runtime exception so the test fails fast.
##########
api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java:
##########
@@ -27,8 +27,11 @@ public class Indexes {
/** An empty array of indexes. */
public static final Index[] EMPTY_INDEXES = new Index[0];
- /** MySQL does not support setting the name of the primary key, so the
default name is used. */
- public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";
+ /**
+ * Name of the default primary key. MySQL, ClickHouse, OceanBase and many
other databases supports
+ * setting the name of the primary key and use it as primary key name.
+ */
Review Comment:
The comment for `DEFAULT_PRIMARY_KEY_NAME` is misleading: MySQL does not
allow customizing the primary key constraint name (it’s always `PRIMARY`).
Please adjust the JavaDoc to accurately describe which systems allow custom
naming vs require the default.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java:
##########
@@ -0,0 +1,1646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.integration.test;
+
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE.MERGETREE;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.auth.AuthConstants;
+import
org.apache.gravitino.catalog.clickhouse.integration.test.service.ClickHouseService;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.integration.test.container.ClickHouseContainer;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+@Tag("gravitino-docker-test")
+@TestInstance(Lifecycle.PER_CLASS)
+public class CatalogClickHouseIT extends BaseIT {
+ private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ private static final String provider = "jdbc-clickhouse";
+
+ public String metalakeName =
GravitinoITUtils.genRandomName("clickhouse_it_metalake");
+ public String catalogName =
GravitinoITUtils.genRandomName("clickhouse_it_catalog");
+ public String schemaName =
GravitinoITUtils.genRandomName("clickhouse_it_schema");
+ public String tableName =
GravitinoITUtils.genRandomName("clickhouse_it_table");
+ public String alertTableName = "alert_table_name";
+ public String table_comment = "table_comment";
+
+ // ClickHouse doesn't support schema comment
+ public String schema_comment = null;
+ public String CLICKHOUSE_COL_NAME1 = "clickhouse_col_name1";
+ public String CLICKHOUSE_COL_NAME2 = "clickhouse_col_name2";
+ public String CLICKHOUSE_COL_NAME3 = "clickhouse_col_name3";
+ public String CLICKHOUSE_COL_NAME4 = "clickhouse_col_name4";
+ public String CLICKHOUSE_COL_NAME5 = "clickhouse_col_name5";
+
+ private GravitinoMetalake metalake;
+
+ protected Catalog catalog;
+
+ private ClickHouseService clickhouseService;
+
+ private ClickHouseContainer CLICKHOUSE_CONTAINER;
+
+ private TestDatabaseName TEST_DB_NAME;
+
+ public static final String defaultClickhouseImageName = "clickhouse:8.0";
+
+ protected String clickhouseImageName = defaultClickhouseImageName;
+
+ boolean SupportColumnDefaultValueExpression() {
+ return true;
+ }
+
+ @BeforeAll
+ public void startup() throws IOException, SQLException {
+ TEST_DB_NAME = TestDatabaseName.CLICKHOUSE_CATALOG_CLICKHOUSE_IT;
+
+ containerSuite.startClickHouseContainer(TEST_DB_NAME);
+ CLICKHOUSE_CONTAINER = containerSuite.getClickHouseContainer();
+
+ clickhouseService = new ClickHouseService(CLICKHOUSE_CONTAINER,
TEST_DB_NAME);
+ createMetalake();
+ createCatalog();
+ createSchema();
+ }
+
+ @AfterAll
+ public void stop() {
+ clearTableAndSchema();
+ metalake.disableCatalog(catalogName);
+ metalake.dropCatalog(catalogName);
+ client.disableMetalake(metalakeName);
+ client.dropMetalake(metalakeName);
+ clickhouseService.close();
+ }
+
+ @AfterEach
+ public void resetSchema() {
+ clearTableAndSchema();
+ createSchema();
+ }
+
+ private void clearTableAndSchema() {
+ NameIdentifier[] nameIdentifiers =
+ catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+ for (NameIdentifier nameIdentifier : nameIdentifiers) {
+ catalog.asTableCatalog().dropTable(nameIdentifier);
+ }
+ catalog.asSchemas().dropSchema(schemaName, false);
+ }
+
+ private void createMetalake() {
+ GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+ Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+ client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+ GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+ Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+ metalake = loadMetalake;
+ }
+
+ private void createCatalog() throws SQLException {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+
+ catalogProperties.put(
+ JdbcConfig.JDBC_URL.getKey(),
+ StringUtils.substring(
+ CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME),
+ 0,
+ CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/")));
+ catalogProperties.put(
+ JdbcConfig.JDBC_DRIVER.getKey(),
CLICKHOUSE_CONTAINER.getDriverClassName(TEST_DB_NAME));
+ catalogProperties.put(JdbcConfig.USERNAME.getKey(),
CLICKHOUSE_CONTAINER.getUsername());
+ catalogProperties.put(JdbcConfig.PASSWORD.getKey(),
CLICKHOUSE_CONTAINER.getPassword());
+
+ Catalog createdCatalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.RELATIONAL, provider, "comment",
catalogProperties);
+ Catalog loadCatalog = metalake.loadCatalog(catalogName);
+ Assertions.assertEquals(createdCatalog, loadCatalog);
+
+ catalog = loadCatalog;
+ }
+
+ private void createSchema() {
+ Map<String, String> prop = Maps.newHashMap();
+ Schema createdSchema = null;
+ try {
+ createdSchema = catalog.asSchemas().createSchema(schemaName,
schema_comment, prop);
+ } catch (Exception ex) {
+ throw new RuntimeException("Create schema failed: " + ex.getMessage(),
ex);
+ }
+
+ Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+ Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+ prop.forEach((key, value) ->
Assertions.assertEquals(loadSchema.properties().get(key), value));
+ }
+
+ private Column[] createColumns() {
+ Column col1 = Column.of(CLICKHOUSE_COL_NAME1, Types.IntegerType.get(),
"col_1_comment");
+ Column col2 = Column.of(CLICKHOUSE_COL_NAME2, Types.DateType.get(),
"col_2_comment");
+ Column col3 =
+ Column.of(
+ CLICKHOUSE_COL_NAME3,
+ Types.StringType.get(),
+ "col_3_comment",
+ false,
+ false,
+ DEFAULT_VALUE_NOT_SET);
+
+ return new Column[] {col1, col2, col3};
+ }
+
+ private Column[] createColumnsWithDefaultValue() {
+ return new Column[] {
+ Column.of(
+ CLICKHOUSE_COL_NAME1,
+ Types.FloatType.get(),
+ "col_1_comment",
+ false,
+ false,
+ Literals.of("1.23", Types.FloatType.get())),
+ Column.of(
+ CLICKHOUSE_COL_NAME2,
+ Types.TimestampType.withoutTimeZone(),
+ "col_2_comment",
+ false,
+ false,
+ FunctionExpression.of("now")),
+ Column.of(
+ CLICKHOUSE_COL_NAME3,
+ Types.VarCharType.of(255),
+ "col_3_comment",
+ true,
+ false,
+ Literals.NULL),
+ Column.of(
+ CLICKHOUSE_COL_NAME4,
+ Types.IntegerType.get(),
+ "col_4_comment",
+ false,
+ false,
+ Literals.of("1000", Types.IntegerType.get())),
+ Column.of(
+ CLICKHOUSE_COL_NAME5,
+ Types.DecimalType.of(3, 2),
+ "col_5_comment",
+ true,
+ false,
+ Literals.of("1.23", Types.DecimalType.of(3, 2)))
+ };
+ }
+
+ private Map<String, String> createProperties() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(GRAVITINO_ENGINE_KEY, MERGETREE.getValue());
+ return properties;
+ }
+
+ @Test
+ void testOperationClickhouseSchema() {
+ SupportsSchemas schemas = catalog.asSchemas();
+ Namespace namespace = Namespace.of(metalakeName, catalogName);
+ // list schema check.
+ String[] nameIdentifiers = schemas.listSchemas();
+ Set<String> schemaNames = Sets.newHashSet(nameIdentifiers);
+ Assertions.assertTrue(schemaNames.contains(schemaName));
+
+ NameIdentifier[] clickhouseNamespaces =
clickhouseService.listSchemas(namespace);
+ schemaNames =
+
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+ Assertions.assertTrue(schemaNames.contains(schemaName));
+
+ // create schema check.
+ String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
+ NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName,
testSchemaName);
+ schemas.createSchema(testSchemaName, schema_comment,
Collections.emptyMap());
+ nameIdentifiers = schemas.listSchemas();
+ schemaNames = Sets.newHashSet(nameIdentifiers);
+ Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+ clickhouseNamespaces = clickhouseService.listSchemas(namespace);
+ schemaNames =
+
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+ Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+ Map<String, String> emptyMap = Collections.emptyMap();
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> {
+ schemas.createSchema(testSchemaName, schema_comment, emptyMap);
+ });
+
+ // drop schema check.
+ schemas.dropSchema(testSchemaName, false);
+ Assertions.assertThrows(NoSuchSchemaException.class, () ->
schemas.loadSchema(testSchemaName));
+ Assertions.assertThrows(
+ NoSuchSchemaException.class, () ->
clickhouseService.loadSchema(schemaIdent));
+
+ nameIdentifiers = schemas.listSchemas();
+ schemaNames = Sets.newHashSet(nameIdentifiers);
+ Assertions.assertFalse(schemaNames.contains(testSchemaName));
+ Assertions.assertFalse(schemas.dropSchema("no-exits", false));
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+
+ // create failed check.
+ NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
+ Assertions.assertThrows(
+ NoSuchSchemaException.class,
+ () ->
+ tableCatalog.createTable(
+ table,
+ createColumns(),
+ table_comment,
+ createProperties(),
+ null,
+ Distributions.NONE,
+ getSortOrders(CLICKHOUSE_COL_NAME3)));
+ // drop schema failed check.
+ Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), true));
+ Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), false));
+ Assertions.assertFalse(() -> tableCatalog.dropTable(table));
+ clickhouseNamespaces = clickhouseService.listSchemas(Namespace.empty());
+ schemaNames =
+
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+ Assertions.assertTrue(schemaNames.contains(schemaName));
+ }
+
+ @Test
+ void testCreateAndLoadClickhouseTable() {
+ // Create table from Gravitino API
+ Column[] columns = createColumns();
+
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Distribution distribution = Distributions.NONE;
+
+ Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+
+ Map<String, String> properties = createProperties();
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ getSortOrders(CLICKHOUSE_COL_NAME3));
+
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(table_comment, loadTable.comment());
+ Map<String, String> resultProp = loadTable.properties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+ Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
+ }
+ Assertions.assertEquals(loadTable.columns().length, columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ ITUtils.assertColumn(columns[i], loadTable.columns()[i]);
+ }
+ }
+
+ @Test
+ void testColumnNameWithKeyWords() {
+ // Create table from Gravitino API
+ Column[] columns = {
+ Column.of("integer", Types.IntegerType.get(), "integer", false, false,
DEFAULT_VALUE_NOT_SET),
+ Column.of("long", Types.LongType.get(), "long"),
+ Column.of("float", Types.FloatType.get(), "float"),
+ Column.of("double", Types.DoubleType.get(), "double"),
+ Column.of("decimal", Types.DecimalType.of(10, 3), "decimal"),
+ Column.of("date", Types.DateType.get(), "date"),
+ Column.of("time", Types.TimeType.get(), "time")
+ };
+
+ String name = GravitinoITUtils.genRandomName("table") + "_keyword";
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, name);
+ Distribution distribution = Distributions.NONE;
+
+ Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+
+ Map<String, String> properties = createProperties();
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table createdTable =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ getSortOrders("integer"));
+ Assertions.assertEquals(createdTable.name(), name);
+ }
+
+ @Test
+ // ClickHouse support column default value expression after 8.0.13
+ // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+ @EnabledIf("SupportColumnDefaultValueExpression")
+ void testColumnDefaultValue() {
+ Column col1 =
+ Column.of(
+ CLICKHOUSE_COL_NAME1,
+ Types.IntegerType.get(),
+ "col_1_comment",
+ false,
+ false,
+ FunctionExpression.of("rand"));
+ Column col2 =
+ Column.of(
+ CLICKHOUSE_COL_NAME2,
+ Types.TimestampType.withoutTimeZone(),
+ "col_2_comment",
+ false,
+ false,
+ FunctionExpression.of("now"));
+ Column col3 =
+ Column.of(
+ CLICKHOUSE_COL_NAME3,
+ Types.VarCharType.of(255),
+ "col_3_comment",
+ true,
+ false,
+ Literals.NULL);
+ Column col4 =
+ Column.of(
+ CLICKHOUSE_COL_NAME4, Types.StringType.get(), "col_4_comment",
false, false, null);
+ Column col5 =
+ Column.of(
+ CLICKHOUSE_COL_NAME5,
+ Types.VarCharType.of(255),
+ "col_5_comment",
+ true,
+ false,
+ Literals.stringLiteral("now()"));
+
+ Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
+
+ NameIdentifier tableIdent =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("clickhouse_it_table"));
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdent,
+ newColumns,
+ null,
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ getSortOrders(CLICKHOUSE_COL_NAME1));
+ Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
+ Assertions.assertEquals(
+ UnparsedExpression.of("rand()"),
createdTable.columns()[0].defaultValue());
+ Assertions.assertEquals(
+ UnparsedExpression.of("now()"),
createdTable.columns()[1].defaultValue());
+ Assertions.assertEquals(Literals.NULL,
createdTable.columns()[2].defaultValue());
+ Assertions.assertEquals(DEFAULT_VALUE_NOT_SET,
createdTable.columns()[3].defaultValue());
+ Assertions.assertEquals(
+ Literals.stringLiteral("now()"),
createdTable.columns()[4].defaultValue());
+ }
+
+ @Test
+ // ClickHouse support column default value expression after 8.0.13
+ // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+ @EnabledIf("SupportColumnDefaultValueExpression")
+ void testColumnDefaultValueConverter() {
+ // test convert from ClickHouse to Gravitino
+ String tableName = GravitinoITUtils.genRandomName("test_default_value");
+ String fullTableName = schemaName + "." + tableName;
+ String sql =
+ "CREATE TABLE "
+ + fullTableName
+ + " (\n"
+ + " int_col_1 int default 0x01AF,\n"
+ + " int_col_2 int default (rand()),\n"
+ + " int_col_3 int default 3,\n"
+ + " unsigned_int_col_1 INT UNSIGNED default 1,\n"
+ + " unsigned_bigint_col_1 BIGINT(20) UNSIGNED default 0,\n"
+ + " double_col_1 double default 123.45,\n"
+ + " varchar20_col_1 varchar(20) default '10',\n"
+ + " varchar100_col_1 varchar(100) default 'now()',\n"
+ + " varchar200_col_1 varchar(200) default 'curdate()',\n"
+ + " varchar200_col_2 varchar(200) default (today()),\n"
+ + " varchar200_col_3 varchar(200) default (now()),\n"
+ + " datetime_col_1 datetime default now(),\n"
+ + " datetime_col_2 datetime default now(),\n"
+ + " datetime_col_3 datetime default null,\n"
+ + " datetime_col_4 datetime default 19830905,\n"
+ + " date_col_1 date default (today()),\n"
+ + " date_col_2 date,\n"
+ + " date_col_3 date DEFAULT (today() + INTERVAL 1 YEAR),\n"
+ + " date_col_4 date DEFAULT (today()),\n"
+ + " date_col_5 date DEFAULT '2024-04-01',\n"
+ + " timestamp_col_1 timestamp default '2012-12-31 11:30:45',\n"
+ + " timestamp_col_2 timestamp default 19830905,\n"
+ + " timestamp_col_3 timestamp(6) default now(),\n"
+ + " decimal_6_2_col_1 decimal(6, 2) default 1.2,\n"
+ + " bit_col_1 bit default '1'\n"
+ + ") order by int_col_1;\n";
+
+ clickhouseService.executeQuery(sql);
+ Table loadedTable =
+ catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName,
tableName));
+
+ for (Column column : loadedTable.columns()) {
+ // try {
+ switch (column.name()) {
+ case "int_col_1":
+ Assertions.assertEquals(Literals.integerLiteral(431),
column.defaultValue());
+ break;
+ case "int_col_2":
+ Assertions.assertEquals(UnparsedExpression.of("rand()"),
column.defaultValue());
+ break;
+ case "int_col_3":
+ Assertions.assertEquals(Literals.integerLiteral(3),
column.defaultValue());
+ break;
+ case "unsigned_int_col_1":
+ Assertions.assertEquals(Literals.unsignedIntegerLiteral(1L),
column.defaultValue());
+ break;
+ case "unsigned_bigint_col_1":
+ Assertions.assertEquals(
+ Literals.unsignedLongLiteral(Decimal.of("0")),
column.defaultValue());
+ break;
+ case "double_col_1":
+ Assertions.assertEquals(Literals.doubleLiteral(123.45),
column.defaultValue());
+ break;
+ case "varchar20_col_1":
+ Assertions.assertEquals(Literals.stringLiteral("10"),
column.defaultValue());
+ break;
+ case "varchar100_col_1":
+ Assertions.assertEquals(Literals.stringLiteral("now()"),
column.defaultValue());
+ break;
+ case "varchar200_col_1":
+ Assertions.assertEquals(Literals.stringLiteral("curdate()"),
column.defaultValue());
+ break;
+ case "varchar200_col_2":
+ Assertions.assertEquals(Literals.stringLiteral("today()"),
column.defaultValue());
+ break;
+ case "varchar200_col_3":
+ Assertions.assertEquals(Literals.stringLiteral("now()"),
column.defaultValue());
+ break;
+ case "datetime_col_1":
+ case "datetime_col_2":
+ Assertions.assertEquals(UnparsedExpression.of("now()"),
column.defaultValue());
+ break;
+ case "datetime_col_3":
+ Assertions.assertEquals(Literals.NULL, column.defaultValue());
+ break;
+ case "datetime_col_4":
+ Assertions.assertEquals(UnparsedExpression.of("19830905"),
column.defaultValue());
+ break;
+ case "date_col_1":
+ Assertions.assertEquals(UnparsedExpression.of("today()"),
column.defaultValue());
+ break;
+ case "date_col_2":
+ Assertions.assertEquals(DEFAULT_VALUE_NOT_SET,
column.defaultValue());
+ break;
+ case "date_col_3":
+ Assertions.assertEquals(
+ UnparsedExpression.of("today() + toIntervalYear(1)"),
column.defaultValue());
+ break;
+ case "date_col_4":
+ Assertions.assertEquals(UnparsedExpression.of("today()"),
column.defaultValue());
+ break;
+ case "date_col_5":
+ Assertions.assertEquals(
+ Literals.dateLiteral(LocalDate.of(2024, 4, 1)),
column.defaultValue());
+ break;
+ case "timestamp_col_1":
+ Assertions.assertEquals(
+ Literals.timestampLiteral("2012-12-31T11:30:45"),
column.defaultValue());
+ break;
+ case "timestamp_col_2":
+ Assertions.assertEquals(UnparsedExpression.of("19830905"),
column.defaultValue());
+ break;
+ case "timestamp_col_3":
+ Assertions.assertEquals(UnparsedExpression.of("now()"),
column.defaultValue());
+ break;
+ case "decimal_6_2_col_1":
+ Assertions.assertEquals(
+ Literals.decimalLiteral(Decimal.of("1.2", 6, 2)),
column.defaultValue());
+ break;
+ case "bit_col_1":
+ Assertions.assertEquals(
+ Literals.unsignedLongLiteral(Decimal.of("1")),
column.defaultValue());
+ break;
+ default:
+ Assertions.fail(
+ "Unexpected column name: "
+ + column.name()
+ + ", default value: "
+ + column.defaultValue());
+ }
+ // } catch (Throwable ex) {
+ // System.err.println(ex);
+ // for (Object obj : ex.getStackTrace()) {
+ // System.err.println(obj);
+ // }
+ // }
+ }
+ }
+
+ @Test
+ void testColumnTypeConverter() {
+ // test convert from ClickHouse to Gravitino
+ String tableName = GravitinoITUtils.genRandomName("test_type_converter");
+ String fullTableName = schemaName + "." + tableName;
+ String sql =
+ "CREATE TABLE "
+ + fullTableName
+ + " (\n"
+ + " tinyint_col tinyint,\n"
+ + " smallint_col smallint,\n"
+ + " int_col int,\n"
+ + " bigint_col bigint,\n"
+ + " float_col float,\n"
+ + " double_col double,\n"
+ + " date_col date,\n"
+ + " time_col time,\n"
+ + " timestamp_col timestamp,\n"
+ + " datetime_col datetime,\n"
+ + " decimal_6_2_col decimal(6, 2),\n"
+ + " varchar20_col varchar(20),\n"
+ + " text_col text,\n"
+ // + " binary_col binary,\n"
+ + " blob_col blob\n"
+ + ") order by tinyint_col;\n";
+
+ clickhouseService.executeQuery(sql);
+ Table loadedTable =
+ catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName,
tableName));
+
+ for (Column column : loadedTable.columns()) {
+ switch (column.name()) {
+ case "tinyint_col":
+ Assertions.assertEquals(Types.ByteType.get(), column.dataType());
+ break;
+ case "smallint_col":
+ Assertions.assertEquals(Types.ShortType.get(), column.dataType());
+ break;
+ case "int_col":
+ Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
+ break;
+ case "bigint_col":
+ Assertions.assertEquals(Types.LongType.get(), column.dataType());
+ break;
+ case "float_col":
+ Assertions.assertEquals(Types.FloatType.get(), column.dataType());
+ break;
+ case "double_col":
+ Assertions.assertEquals(Types.DoubleType.get(), column.dataType());
+ break;
+ case "date_col":
+ Assertions.assertEquals(Types.DateType.get(), column.dataType());
+ break;
+ case "time_col":
+ Assertions.assertEquals(Types.LongType.get(), column.dataType());
+ break;
+ case "timestamp_col":
+ Assertions.assertEquals(Types.TimestampType.withoutTimeZone(),
column.dataType());
+ break;
+ case "datetime_col":
+ Assertions.assertEquals(Types.TimestampType.withoutTimeZone(),
column.dataType());
+ break;
+ case "decimal_6_2_col":
+ Assertions.assertEquals(Types.DecimalType.of(6, 2),
column.dataType());
+ break;
+ case "varchar20_col":
+ Assertions.assertEquals(Types.StringType.get(), column.dataType());
+ break;
+ case "text_col":
+ Assertions.assertEquals(Types.StringType.get(), column.dataType());
+ break;
+ case "binary_col":
+ Assertions.assertEquals(Types.BinaryType.get(), column.dataType());
+ break;
+ case "blob_col":
+ Assertions.assertEquals(Types.StringType.get(), column.dataType());
+ break;
+ default:
+ Assertions.fail("Unexpected column name: " + column.name());
+ }
+ }
+ }
+
+ @Test
+ void testAlterAndDropClickhouseTable() {
+ Column[] columns = createColumns();
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(schemaName, tableName),
+ columns,
+ table_comment,
+ createProperties(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ getSortOrders(CLICKHOUSE_COL_NAME3));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.rename(alertTableName),
+ TableChange.updateComment(table_comment + "_new"));
+ });
+
+ catalog
+ .asTableCatalog()
+ .alterTable(NameIdentifier.of(schemaName, tableName),
TableChange.rename(alertTableName));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, alertTableName),
+ TableChange.updateComment(table_comment + "_new"));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, alertTableName),
+ TableChange.addColumn(new String[] {"col_4"},
Types.StringType.get()));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, alertTableName),
+ TableChange.renameColumn(new String[] {CLICKHOUSE_COL_NAME2},
"col_2_new"));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, alertTableName),
+ TableChange.updateColumnType(
+ new String[] {CLICKHOUSE_COL_NAME1}, Types.IntegerType.get()));
+
+ Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName,
alertTableName));
+ Assertions.assertEquals(alertTableName, table.name());
+
+ Assertions.assertEquals(CLICKHOUSE_COL_NAME1, table.columns()[0].name());
+ Assertions.assertEquals(Types.IntegerType.get(),
table.columns()[0].dataType());
+
+ Assertions.assertEquals("col_2_new", table.columns()[1].name());
+ Assertions.assertEquals(Types.DateType.get(),
table.columns()[1].dataType());
+ Assertions.assertEquals("col_2_comment", table.columns()[1].comment());
+
+ Assertions.assertEquals(CLICKHOUSE_COL_NAME3, table.columns()[2].name());
+ Assertions.assertEquals(Types.StringType.get(),
table.columns()[2].dataType());
+ Assertions.assertEquals("col_3_comment", table.columns()[2].comment());
+
+ Assertions.assertEquals("col_4", table.columns()[3].name());
+ Assertions.assertEquals(Types.StringType.get(),
table.columns()[3].dataType());
+ Assertions.assertNull(table.columns()[3].comment());
+ Assertions.assertNotNull(table.auditInfo());
+ Assertions.assertNotNull(table.auditInfo().createTime());
+ Assertions.assertNotNull(table.auditInfo().creator());
+ Assertions.assertNotNull(table.auditInfo().lastModifiedTime());
+ Assertions.assertNotNull(table.auditInfo().lastModifier());
+
+ Column col1 =
+ Column.of("name", Types.StringType.get(), "comment", false, false,
DEFAULT_VALUE_NOT_SET);
+ Column col2 = Column.of("address", Types.StringType.get(), "comment");
+ Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
+
+ Column[] newColumns = new Column[] {col1, col2, col3};
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("CatalogJdbcIT_table"));
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier,
+ newColumns,
+ table_comment,
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ getSortOrders("name"));
+
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ TableChange change =
+ TableChange.updateColumnPosition(
+ new String[] {"no_column"}, TableChange.ColumnPosition.first());
+ NotFoundException notFoundException =
+ assertThrows(
+ NotFoundException.class, () ->
tableCatalog.alterTable(tableIdentifier, change));
+
Assertions.assertTrue(notFoundException.getMessage().contains("no_column"));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ tableIdentifier,
+ TableChange.updateColumnPosition(
+ new String[] {col1.name()},
TableChange.ColumnPosition.after(col2.name())));
+
+ Table updateColumnPositionTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+ Column[] updateCols = updateColumnPositionTable.columns();
+ Assertions.assertEquals(3, updateCols.length);
+ Assertions.assertEquals(col2.name(), updateCols[0].name());
+ Assertions.assertEquals(col1.name(), updateCols[1].name());
+ Assertions.assertEquals(col3.name(), updateCols[2].name());
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ tableIdentifier,
+ TableChange.deleteColumn(new String[] {col3.name()}, true),
+ TableChange.deleteColumn(new String[] {col2.name()},
true)));
+ Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier);
+ Assertions.assertEquals(1, delColTable.columns().length);
+ Assertions.assertEquals(col1.name(), delColTable.columns()[0].name());
+
+ Assertions.assertDoesNotThrow(
+ () -> {
+ catalog.asTableCatalog().dropTable(tableIdentifier);
+ });
+ }
+
+ @Test
+ void testUpdateColumnDefaultValue() {
+ Column[] columns = createColumnsWithDefaultValue();
+ Table table =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(schemaName, tableName),
+ columns,
+ null,
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ getSortOrders(CLICKHOUSE_COL_NAME1));
+
+ Assertions.assertEquals(AuthConstants.ANONYMOUS_USER,
table.auditInfo().creator());
+ Assertions.assertNull(table.auditInfo().lastModifier());
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[0].name()}, Literals.of("1.2345",
Types.FloatType.get())));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[1].name()},
FunctionExpression.of("now")));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[2].name()}, Literals.of("hello",
Types.VarCharType.of(255))));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[3].name()}, Literals.of("2000",
Types.IntegerType.get())));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[4].name()}, Literals.of("2.34",
Types.DecimalType.of(3, 2))));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[0].name()}, Literals.of("1.2345",
Types.FloatType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[1].name()},
FunctionExpression.of("now")),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[2].name()}, Literals.of("hello",
Types.VarCharType.of(255))),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[3].name()}, Literals.of("2000",
Types.IntegerType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[4].name()}, Literals.of("2.34",
Types.DecimalType.of(3, 2))));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[0].name()}, Literals.of("1.2345",
Types.FloatType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[1].name()},
FunctionExpression.of("now")),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[2].name()}, Literals.of("hello",
Types.VarCharType.of(255))),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[3].name()}, Literals.of("2000",
Types.IntegerType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[4].name()}, Literals.of("2.34",
Types.DecimalType.of(3, 2))));
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[0].name()}, Literals.of("1.2345",
Types.FloatType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[1].name()},
FunctionExpression.of("now")),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[2].name()}, Literals.of("hello",
Types.VarCharType.of(255))),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[3].name()}, Literals.of("2000",
Types.IntegerType.get())),
+ TableChange.updateColumnDefaultValue(
+ new String[] {columns[4].name()}, Literals.of("2.34",
Types.DecimalType.of(3, 2))));
+
+ table = catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName,
tableName));
+
+ Assertions.assertEquals(
+ Literals.of("1.2345", Types.FloatType.get()),
table.columns()[0].defaultValue());
+ Assertions.assertEquals(UnparsedExpression.of("now()"),
table.columns()[1].defaultValue());
+ Assertions.assertEquals(
+ Literals.of("hello", Types.StringType.get()),
table.columns()[2].defaultValue());
+ Assertions.assertEquals(
+ Literals.of("2000", Types.IntegerType.get()),
table.columns()[3].defaultValue());
+ Assertions.assertEquals(
+ Literals.of("2.34", Types.DecimalType.of(3, 2)),
table.columns()[4].defaultValue());
+ }
+
+ @Test
+ void testDropClickHouseDatabase() {
+ String schemaName =
GravitinoITUtils.genRandomName("clickhouse_schema").toLowerCase();
+ String tableName =
GravitinoITUtils.genRandomName("clickhouse_table").toLowerCase();
+
+ catalog
+ .asSchemas()
+ .createSchema(schemaName, null, ImmutableMap.<String,
String>builder().build());
+
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(schemaName, tableName),
+ createColumns(),
+ "Created by Gravitino client",
+ ImmutableMap.<String, String>builder().build(),
+ getSortOrders(CLICKHOUSE_COL_NAME3));
+
+ // Try to drop a database, and cascade equals to false, it should not be
+ // allowed.
+ Throwable excep =
+ Assertions.assertThrows(
+ RuntimeException.class, () ->
catalog.asSchemas().dropSchema(schemaName, false));
+ Assertions.assertTrue(excep.getMessage().contains("the value of cascade
should be true."));
+ // Assertions.assertTrue(
+ // excep.getMessage().contains("has sub-entities, you should remove
sub-entities
+ // first"));
+
+ // Check the database still exists
+ catalog.asSchemas().loadSchema(schemaName);
+
+ // Try to drop a database, and cascade equals to true, it should be
allowed.
+ catalog.asSchemas().dropSchema(schemaName, true);
+ // Check database has been dropped
+ SupportsSchemas schemas = catalog.asSchemas();
+ Assertions.assertThrows(
+ NoSuchSchemaException.class,
+ () -> {
+ schemas.loadSchema(schemaName);
+ });
+ }
+
+ @Test
+ public void testSchemaComment() {
+ final String testSchemaName = "test";
+ RuntimeException exception =
+ Assertions.assertThrowsExactly(
+ UnsupportedOperationException.class,
+ () -> catalog.asSchemas().createSchema(testSchemaName, "comment",
null));
+ Assertions.assertTrue(
+ exception.getMessage().contains("Doesn't support setting schema
comment: comment"));
+
+ // test null comment
+ String testSchemaName2 = "test2";
+ Schema schema = catalog.asSchemas().createSchema(testSchemaName2, "",
null);
+ Assertions.assertTrue(StringUtils.isEmpty(schema.comment()));
+ schema = catalog.asSchemas().loadSchema(testSchemaName2);
+ Assertions.assertTrue(StringUtils.isEmpty(schema.comment()));
+ catalog.asSchemas().dropSchema(testSchemaName2, true);
+ }
+
+ @Test
+ public void testBackQuoteTable() {
+ Column col1 = Column.of("create", Types.LongType.get(), "id", false,
false, null);
+ Column col2 = Column.of("delete", Types.ByteType.get(), "yes", false,
false, null);
+ Column col3 = Column.of("show", Types.DateType.get(), "comment", false,
false, null);
+ Column col4 = Column.of("status", Types.VarCharType.of(255), "code",
false, false, null);
+ Column[] newColumns = new Column[] {col1, col2, col3, col4};
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, "table");
+ Assertions.assertDoesNotThrow(
+ () ->
+ tableCatalog.createTable(
+ tableIdentifier,
+ newColumns,
+ table_comment,
+ Collections.emptyMap(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ getSortOrders("create"),
+ Indexes.EMPTY_INDEXES));
+
+ Assertions.assertDoesNotThrow(() ->
tableCatalog.loadTable(tableIdentifier));
+
+ Assertions.assertDoesNotThrow(
+ () ->
+ tableCatalog.alterTable(
+ tableIdentifier,
+ TableChange.addColumn(
+ new String[] {"int"},
+ Types.StringType.get(),
+ TableChange.ColumnPosition.after("status"))));
+
+ Table table = tableCatalog.loadTable(tableIdentifier);
+ for (Column column : table.columns()) {
+ System.out.println(column.name());
+ }
Review Comment:
Avoid `System.out.println` in tests; it adds noisy output to CI logs. Use
the existing logger (or remove the print) since the test already has assertions.
```suggestion
tableCatalog.loadTable(tableIdentifier);
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/converter/ClickHouseColumnDefaultValueConverter.java:
##########
@@ -1,25 +1,161 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.gravitino.catalog.clickhouse.converter;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
public class ClickHouseColumnDefaultValueConverter extends
JdbcColumnDefaultValueConverter {
- // Implement ClickHouse specific default value conversions if needed
+
+ protected static final String NOW = "now";
+ protected static final Expression DEFAULT_VALUE_OF_NOW =
FunctionExpression.of(NOW);
+
+ public String fromGravitino(Expression defaultValue) {
+ if (DEFAULT_VALUE_NOT_SET.equals(defaultValue)) {
+ return null;
+ }
+
+ if (defaultValue instanceof FunctionExpression functionExpression) {
+ return String.format("(%s)", functionExpression);
+ }
+
+ if (defaultValue instanceof Literal<?> literal) {
+ Type type = literal.dataType();
+ if (defaultValue.equals(Literals.NULL)) {
+ return NULL;
+ } else if (type instanceof Type.NumericType) {
+ return literal.value().toString();
+ } else {
+ Object value = literal.value();
+ if (value instanceof LocalDateTime) {
+ value = ((LocalDateTime) value).format(DATE_TIME_FORMATTER);
+ }
+ return String.format("'%s'", value);
Review Comment:
`fromGravitino` wraps string-like literal values as `'%s'` without escaping
embedded single quotes, which can generate invalid SQL or enable injection via
default values. Escape single quotes (replace `'` with `''`) before embedding
into SQL.
```suggestion
String escapedValue =
value == null ? null : String.valueOf(value).replace("'", "''");
return String.format("'%s'", escapedValue);
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +126,920 @@ protected String generateCreateTableSql(
Distribution distribution,
Index[] indexes) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateCreateTableSql is not implemented
yet.");
+ "generateCreateTableSql with out sortOrders in clickhouse is not
supported");
+ }
+
+ @Override
+ protected String generateCreateTableSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ SortOrder[] sortOrders) {
+
+ Preconditions.checkArgument(
+ Distributions.NONE.equals(distribution), "ClickHouse does not support
distribution");
+
+ StringBuilder sqlBuilder = new StringBuilder();
+
+ Map<String, String> notNullProperties =
+ MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
+
+ // Add Create table clause
+ boolean onCluster = appendCreateTableClause(notNullProperties, sqlBuilder,
tableName);
+
+ // Add columns
+ buildColumnsDefinition(columns, sqlBuilder);
+
+ // Index definition
+ appendIndexesSql(indexes, sqlBuilder);
+
+ sqlBuilder.append("\n)");
+
+ // Extract engine from properties
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
+ appendTableEngine(notNullProperties, sqlBuilder, onCluster);
+
+ appendOrderBy(sortOrders, sqlBuilder, engine);
+
+ appendPartitionClause(partitioning, sqlBuilder, engine);
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+ }
+
+ // Add setting clause if specified, clickhouse only supports predefine
settings
+ appendTableProperties(notNullProperties, sqlBuilder);
+
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(CLUSTER_NAME);
+ String onClusterValue = properties.get(ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ &&
Boolean.TRUE.equals(BooleanUtils.toBooleanObject(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER `%s`
(\n".formatted(quoteIdentifier(tableName), clusterName));
+ } else {
+ sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+ }
+
+ return onCluster;
+ }
+
+ private static void appendTableProperties(
+ Map<String, String> properties, StringBuilder sqlBuilder) {
+ if (MapUtils.isEmpty(properties)) {
+ return;
+ }
+
+ Map<String, String> settingMap =
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if (MapUtils.isEmpty(settingMap)) {
+ return;
+ }
+
+ String settings =
+ settingMap.entrySet().stream()
+ .map(
+ entry ->
+ entry.getKey().substring(SETTINGS_PREFIX.length()) + " = "
+ entry.getValue())
+ .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+ sqlBuilder.append(settings);
+ }
+
+ private static void appendOrderBy(
+ SortOrder[] sortOrders,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ // ClickHouse requires ORDER BY clause for some engines, and currently
only mergeTree family
+ // requires ORDER BY clause.
+ boolean requireOrderBy = engine.isRequireOrderBy();
+ if (!requireOrderBy) {
+ if (ArrayUtils.isNotEmpty(sortOrders)) {
+ throw new UnsupportedOperationException(
+ "ORDER BY clause is not supported for engine: " +
engine.getValue());
+ }
+
+ // No need to add order by clause
+ return;
+ }
+
+ if (ArrayUtils.isEmpty(sortOrders)) {
+ throw new IllegalArgumentException(
+ "ORDER BY clause is required for engine: " + engine.getValue());
+ }
+
+ if (sortOrders.length > 1) {
+ throw new UnsupportedOperationException(
+ "Currently ClickHouse does not support sortOrders with more than 1
element");
+ }
+
+ NullOrdering nullOrdering = sortOrders[0].nullOrdering();
+ SortDirection sortDirection = sortOrders[0].direction();
+ if (nullOrdering != null && sortDirection != null) {
+ // ClickHouse does not support NULLS FIRST/LAST now.
+ LOG.warn(
+ "ClickHouse currently does not support nullOrdering: {}, and will
ignore it",
+ nullOrdering);
+ }
+
+ sqlBuilder.append("\n ORDER BY
`%s`\n".formatted(sortOrders[0].expression()));
+ }
+
+ private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
+ if (MapUtils.isNotEmpty(properties)) {
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
+ if (StringUtils.isNotEmpty(userSetEngine)) {
+ engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
+ }
+ }
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(CLUSTER_NAME);
+ String remoteDatabase = properties.get(CLUSTER_REMOTE_DATABASE);
+ String remoteTable = properties.get(CLUSTER_REMOTE_TABLE);
+ String shardingKey = properties.get(CLUSTER_SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
+ // Now check if engine is distributed, we need to check the remote
database and table properties
+
+ sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
+ return engine;
+ }
+
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
+ return quoteIdentifier(fieldName);
+ }
+
+ private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder
sqlBuilder) {
+ for (int i = 0; i < columns.length; i++) {
+ JdbcColumn column = columns[i];
+ sqlBuilder.append(" %s".formatted(quoteIdentifier(column.name())));
+
+ appendColumnDefinition(column, sqlBuilder);
+ // Add a comma for the next column, unless it's the last one
+ if (i < columns.length - 1) {
+ sqlBuilder.append(",\n");
+ }
+ }
+ }
+
+ /**
+ * ClickHouse supports primary key and data skipping indexes.
+ *
+ * <p>This method will not check the validity of the indexes. For
ClickHouse, the primary key must
+ * be a subset of the order by columns. We will leave the underlying
clickhouse to validate it.
+ */
+ private void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+ if (ArrayUtils.isEmpty(indexes)) {
+ return;
+ }
+
+ for (Index index : indexes) {
+ String fieldStr = getIndexFieldStr(index.fieldNames());
+ sqlBuilder.append(",\n");
+ switch (index.type()) {
+ case PRIMARY_KEY:
+ if (null != index.name()
+ && !StringUtils.equalsIgnoreCase(index.name(),
Indexes.DEFAULT_PRIMARY_KEY_NAME)) {
+ LOG.warn(
+ "Primary key name must be PRIMARY in ClickHouse, the name {}
will be ignored.",
+ index.name());
+ }
+ // fieldStr already quoted in getIndexFieldStr
+ sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
+ break;
+ case DATA_SKIPPING_MINMAX:
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ // The GRANULARITY value is always 1 here currently.
+ sqlBuilder.append(
+ " INDEX %s %s TYPE minmax GRANULARITY 1"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ case DATA_SKIPPING_BLOOM_FILTER:
+ // The GRANULARITY value is always 3 here currently.
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ sqlBuilder.append(
+ " INDEX %s %s TYPE bloom_filter GRANULARITY 3"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Gravitino Clickhouse doesn't support index : " + index.type());
+ }
+ }
+ }
+
+ @Override
+ protected boolean getAutoIncrementInfo(ResultSet resultSet) throws
SQLException {
+ return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT"));
+ }
+
+ @Override
+ public void alterTable(String databaseName, String tableName, TableChange...
changes)
+ throws NoSuchTableException {
+ LOG.info("Attempting to alter table {} from database {}", tableName,
databaseName);
+ try (Connection connection = getConnection(databaseName)) {
+ for (TableChange change : changes) {
+ String sql = generateAlterTableSql(databaseName, tableName, change);
+ if (StringUtils.isEmpty(sql)) {
+ LOG.info("No changes to alter table {} from database {}", tableName,
databaseName);
+ return;
+ }
+ JdbcConnectorUtils.executeUpdate(connection, sql);
+ }
+ LOG.info("Alter table {} from database {}", tableName, databaseName);
+ } catch (final SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
+ }
+ }
+
+ @Override
+ protected Map<String, String> getTableProperties(Connection connection,
String tableName)
+ throws SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement("select * from system.tables where name =
? ")) {
+ statement.setString(1, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ if (Objects.equals(name, tableName)) {
+ return Collections.unmodifiableMap(
+ new HashMap<String, String>() {
+ {
+ put(COMMENT, resultSet.getString(COMMENT));
+ put(GRAVITINO_ENGINE_KEY,
resultSet.getString(CLICKHOUSE_ENGINE_KEY));
+ }
+ });
+ }
+ }
+
+ throw new NoSuchTableException(
+ "Table %s does not exist in %s.", tableName,
connection.getCatalog());
+ }
+ }
+ }
+
+ @Override
+ protected Transform[] getTablePartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement(
+ "SELECT partition_key FROM system.tables WHERE database = ? AND
name = ?")) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ String partitionKey = resultSet.getString("partition_key");
+ try {
+ return parsePartitioning(partitionKey);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported partition expression {} for {}.{}",
+ partitionKey,
+ databaseName,
+ tableName);
+ return Transforms.EMPTY_TRANSFORM;
+ }
+ }
+ }
+ }
+
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ protected ResultSet getTables(Connection connection) throws SQLException {
+ final DatabaseMetaData metaData = connection.getMetaData();
+ String catalogName = connection.getCatalog();
+ String schemaName = connection.getSchema();
+ // CK tables include : DICTIONARY", "LOG TABLE", "MEMORY TABLE",
+ // "REMOTE TABLE", "TABLE", "VIEW", "SYSTEM TABLE", "TEMPORARY TABLE
+ return metaData.getTables(catalogName, schemaName, null, new String[]
{"TABLE"});
}
@Override
protected String generatePurgeTableSql(String tableName) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generatePurgeTableSql is not implemented
yet.");
+ "ClickHouse does not support purge table in Gravitino, please use drop
table");
}
@Override
protected String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes) {
- throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateAlterTableSql is not implemented
yet.");
+ // Not all operations require the original table information, so lazy
loading is used here
+ JdbcTable lazyLoadTable = null;
+ TableChange.UpdateComment updateComment = null;
+ List<TableChange.SetProperty> setProperties = new ArrayList<>();
+ List<String> alterSql = new ArrayList<>();
+
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.UpdateComment) {
+ updateComment = (TableChange.UpdateComment) change;
+
+ } else if (change instanceof TableChange.SetProperty setProperty) {
+ // The set attribute needs to be added at the end.
+ setProperties.add(setProperty);
+
+ } else if (change instanceof TableChange.RemoveProperty) {
+ // Clickhouse does not support deleting table attributes, it can be
replaced by Set Property
+ throw new IllegalArgumentException("Remove property is not supported
yet");
+
+ } else if (change instanceof TableChange.AddColumn addColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(addColumnFieldDefinition(addColumn));
+
+ } else if (change instanceof TableChange.RenameColumn renameColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(renameColumnFieldDefinition(renameColumn));
+
+ } else if (change instanceof TableChange.UpdateColumnDefaultValue
updateColumnDefaultValue) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnDefaultValueFieldDefinition(updateColumnDefaultValue,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnType
updateColumnType) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnTypeFieldDefinition(updateColumnType,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnComment
updateColumnComment) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.UpdateColumnPosition
updateColumnPosition) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition,
lazyLoadTable));
+
+ } else if (change instanceof TableChange.DeleteColumn deleteColumn) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ String deleteColSql = deleteColumnFieldDefinition(deleteColumn,
lazyLoadTable);
+
+ if (StringUtils.isNotEmpty(deleteColSql)) {
+ alterSql.add(deleteColSql);
+ }
+
+ } else if (change instanceof TableChange.UpdateColumnNullability) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnNullabilityDefinition(
+ (TableChange.UpdateColumnNullability) change, lazyLoadTable));
+
+ } else if (change instanceof TableChange.DeleteIndex) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(deleteIndexDefinition(lazyLoadTable,
(TableChange.DeleteIndex) change));
+
+ } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+ lazyLoadTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ alterSql.add(
+ updateColumnAutoIncrementDefinition(
+ lazyLoadTable, (TableChange.UpdateColumnAutoIncrement)
change));
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change type: " + change.getClass().getName());
+ }
+ }
+
+ if (!setProperties.isEmpty()) {
+ alterSql.add(generateAlterTableProperties(setProperties));
+ }
+
+ // Last modified comment
+ if (null != updateComment) {
+ String newComment = updateComment.getNewComment();
+ if (null == StringIdentifier.fromComment(newComment)) {
+ // Detect and add Gravitino id.
+ JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName,
lazyLoadTable);
+ StringIdentifier identifier =
StringIdentifier.fromComment(jdbcTable.comment());
+ if (null != identifier) {
+ newComment = StringIdentifier.addToComment(identifier, newComment);
+ }
+ }
+ alterSql.add(" MODIFY COMMENT '%s'".formatted(newComment));
+ }
+
+ if (!setProperties.isEmpty()) {
+ alterSql.add(generateAlterTableProperties(setProperties));
+ }
+
+ // Remove all empty SQL statements
+ List<String> nonEmptySQLs =
+
alterSql.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(nonEmptySQLs)) {
+ return "";
+ }
+
+ // Return the generated SQL statement
+ String result =
+ "ALTER TABLE %s \n%s;"
+ .formatted(quoteIdentifier(tableName), String.join(",\n",
nonEmptySQLs));
+ LOG.info("Generated alter table:{} sql: {}", databaseName + "." +
tableName, result);
+ return result;
+ }
+
+ private String updateColumnAutoIncrementDefinition(
+ JdbcTable table, TableChange.UpdateColumnAutoIncrement change) {
+ if (change.fieldName().length > 1) {
+ throw new UnsupportedOperationException("Nested column names are not
supported");
+ }
+
+ String col = change.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(table, col);
+ if (change.isAutoIncrement()) {
+ Preconditions.checkArgument(
+ Types.allowAutoIncrement(column.dataType()),
+ "Auto increment is not allowed, type: " + column.dataType());
+ }
+
+ JdbcColumn updateColumn =
+ JdbcColumn.builder()
+ .withName(col)
+ .withDefaultValue(column.defaultValue())
+ .withNullable(column.nullable())
+ .withType(column.dataType())
+ .withComment(column.comment())
+ .withAutoIncrement(change.isAutoIncrement())
+ .build();
+
+ return MODIFY_COLUMN
+ + quoteIdentifier(col)
+ + appendColumnDefinition(updateColumn, new StringBuilder());
+ }
+
+ @VisibleForTesting
+ private String deleteIndexDefinition(
+ JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
+ boolean indexExists =
+ Arrays.stream(lazyLoadTable.index())
+ .anyMatch(index -> index.name().equals(deleteIndex.getName()));
+
+ // Index does not exist
+ if (!indexExists) {
+ // If ifExists is true, return empty string to skip the operation
+ if (deleteIndex.isIfExists()) {
+ return "";
+ } else {
+ throw new IllegalArgumentException(
+ "Index '%s' does not exist".formatted(deleteIndex.getName()));
+ }
+ }
+
+ return String.format("DROP INDEX %s
".formatted(quoteIdentifier(deleteIndex.getName())));
+ }
+
+ private String updateColumnNullabilityDefinition(
+ TableChange.UpdateColumnNullability change, JdbcTable table) {
+ validateUpdateColumnNullable(change, table);
+
+ String col = change.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(table, col);
+ JdbcColumn updateColumn =
+ JdbcColumn.builder()
+ .withName(col)
+ .withDefaultValue(column.defaultValue())
+ .withNullable(change.nullable())
+ .withType(column.dataType())
+ .withComment(column.comment())
+ .withAutoIncrement(column.autoIncrement())
+ .build();
+
+ return MODIFY_COLUMN
+ + quoteIdentifier(col)
+ + appendColumnDefinition(updateColumn, new StringBuilder());
+ }
+
+ private String generateAlterTableProperties(List<TableChange.SetProperty>
setProperties) {
+ if (CollectionUtils.isNotEmpty(setProperties)) {
+ throw new UnsupportedOperationException(
+ "Alter table properties in ClickHouse is not supported");
+ }
+
+ return "";
+ }
+
+ private String updateColumnCommentFieldDefinition(
+ TableChange.UpdateColumnComment updateColumnComment, JdbcTable
jdbcTable) {
+ String newComment = updateColumnComment.getNewComment();
+ if (updateColumnComment.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+
+ String col = updateColumnComment.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+ JdbcColumn updateColumn =
+ JdbcColumn.builder()
+ .withName(col)
+ .withDefaultValue(column.defaultValue())
+ .withNullable(column.nullable())
+ .withType(column.dataType())
+ .withComment(newComment)
+ .withAutoIncrement(column.autoIncrement())
+ .build();
+
+ return MODIFY_COLUMN
+ + quoteIdentifier(col)
+ + appendColumnDefinition(updateColumn, new StringBuilder());
+ }
+
+ private String addColumnFieldDefinition(TableChange.AddColumn addColumn) {
+ String dataType = typeConverter.fromGravitino(addColumn.getDataType());
+ if (addColumn.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+
+ String col = addColumn.fieldName()[0];
+ StringBuilder columnDefinition = new StringBuilder();
+ // [IF NOT EXISTS] name [type] [default_expr] [codec] [AFTER name_after |
FIRST]
+ if (addColumn.isNullable()) {
+ columnDefinition.append(
+ "ADD COLUMN %s Nullable(%s) ".formatted(quoteIdentifier(col),
dataType));
+ } else {
+ columnDefinition.append("ADD COLUMN %s %s
".formatted(quoteIdentifier(col), dataType));
+ }
+
+ if (addColumn.isAutoIncrement()) {
+ throw new UnsupportedOperationException(
+ "ClickHouse does not support adding auto increment column");
+ }
+
+ // Append default value if available
+ if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+ columnDefinition.append(
+ "DEFAULT %s "
+
.formatted(columnDefaultValueConverter.fromGravitino(addColumn.getDefaultValue())));
+ }
+
+ // Append comment if available after default value
+ if (StringUtils.isNotEmpty(addColumn.getComment())) {
+ String escapedComment = StringUtils.replace(addColumn.getComment(), "'",
"''");
+ columnDefinition.append("COMMENT '%s'".formatted(escapedComment));
+ }
+
+ // Append position if available
+ if (addColumn.getPosition() instanceof TableChange.First) {
+ columnDefinition.append("FIRST");
+ } else if (addColumn.getPosition() instanceof TableChange.After
afterPosition) {
+ columnDefinition.append("AFTER %s
".formatted(quoteIdentifier(afterPosition.getColumn())));
+ } else if (addColumn.getPosition() instanceof TableChange.Default) {
+ // Do nothing, follow the default behavior of clickhouse
+ } else {
+ throw new IllegalArgumentException("Invalid column position.");
+ }
+
+ return columnDefinition.toString();
+ }
+
+ private String renameColumnFieldDefinition(TableChange.RenameColumn
renameColumn) {
+ if (renameColumn.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+
+ String oldColumnName = renameColumn.fieldName()[0];
+ String newColumnName = renameColumn.getNewName();
+
+ return "RENAME COLUMN %s TO %s"
+ .formatted(quoteIdentifier(oldColumnName),
quoteIdentifier(newColumnName));
+ }
+
+ private String updateColumnPositionFieldDefinition(
+ TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable
jdbcTable) {
+ if (updateColumnPosition.fieldName().length > 1) {
+ throw new
UnsupportedOperationException(CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG);
+ }
+
+ String col = updateColumnPosition.fieldName()[0];
+ JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col);
+
+ StringBuilder columnDefinition = new StringBuilder();
+ columnDefinition.append("%s %s ".formatted(MODIFY_COLUMN,
quoteIdentifier(col)));
+ appendColumnDefinition(column, columnDefinition);
+
+ if (updateColumnPosition.getPosition() instanceof TableChange.First) {
+ columnDefinition.append("FIRST");
+ } else if (updateColumnPosition.getPosition() instanceof TableChange.After
afterPosition) {
+ columnDefinition.append("%s %s".formatted(AFTER,
quoteIdentifier(afterPosition.getColumn())));
+ } else {
+ Arrays.stream(jdbcTable.columns())
+ .reduce((column1, column2) -> column2)
+ .map(Column::name)
+ .ifPresent(s -> columnDefinition.append(AFTER).append(s));
Review Comment:
In the default branch, `AFTER` is appended without a space and without
quoting the column name, producing invalid SQL like `...AFTERcol`. Use `" AFTER
" + quoteIdentifier(lastColumn)` (or omit the clause if the intent is “no
position change”).
```suggestion
.ifPresent(s -> columnDefinition.append(" AFTER
").append(quoteIdentifier(s)));
```
##########
core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java:
##########
@@ -113,6 +114,14 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogManager.class);
+ private static final Set<String> CONTRIB_CATALOGS_TYPES =
+ new HashSet<>() {
+ {
+ add("jdbc-oceanbase");
+ add("jdbc-clickhouse");
Review Comment:
`CONTRIB_CATALOGS_TYPES` uses double-brace initialization, which creates an
extra anonymous class and can retain an implicit reference; it’s also
inconsistent with typical style. Prefer `ImmutableSet.of(...)` or `Set.of(...)`
to build an immutable set.
##########
catalogs-contrib/catalog-jdbc-clickhouse/build.gradle.kts:
##########
@@ -50,9 +50,13 @@ dependencies {
testImplementation(project(":server-common"))
testImplementation(libs.awaitility)
+ testImplementation(libs.clickhouse.driver)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
+ testImplementation(libs.lz4.java)
+ testImplementation(libs.mockito.core)
testImplementation(libs.testcontainers)
+ testImplementation(libs.testcontainers.clickhouse)
Review Comment:
New test dependencies were added for ClickHouse (JDBC driver, LZ4,
Testcontainers ClickHouse, Mockito). Please ensure they’re strictly necessary,
scoped correctly (test-only), and that any required license/NOTICE updates are
handled per project policy for new third-party deps.
##########
catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java:
##########
@@ -450,8 +450,12 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- Preconditions.checkArgument(
- null == sortOrders || sortOrders.length == 0, "jdbc-catalog does not
support sort orders");
+ // clickhouse support sortOrders
+ if
(!tableOperation.getClass().getSimpleName().equals("ClickHouseTableOperations"))
{
+ Preconditions.checkArgument(
+ null == sortOrders || sortOrders.length == 0,
+ "jdbc-catalog does not support sort orders");
Review Comment:
Checking ClickHouse support via
`tableOperation.getClass().getSimpleName().equals("ClickHouseTableOperations")`
is brittle (breaks with subclasses/shading) and couples behavior to a string.
Prefer `tableOperation instanceof ClickHouseTableOperations` (with a proper
import) or a capability method on `TableOperation`/`JdbcTableOperations` (e.g.,
`supportsSortOrders()`).
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -38,19 +126,920 @@ protected String generateCreateTableSql(
Distribution distribution,
Index[] indexes) {
throw new UnsupportedOperationException(
- "ClickHouseTableOperations.generateCreateTableSql is not implemented
yet.");
+ "generateCreateTableSql with out sortOrders in clickhouse is not
supported");
+ }
+
+ @Override
+ protected String generateCreateTableSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ SortOrder[] sortOrders) {
+
+ Preconditions.checkArgument(
+ Distributions.NONE.equals(distribution), "ClickHouse does not support
distribution");
+
+ StringBuilder sqlBuilder = new StringBuilder();
+
+ Map<String, String> notNullProperties =
+ MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
+
+ // Add Create table clause
+ boolean onCluster = appendCreateTableClause(notNullProperties, sqlBuilder,
tableName);
+
+ // Add columns
+ buildColumnsDefinition(columns, sqlBuilder);
+
+ // Index definition
+ appendIndexesSql(indexes, sqlBuilder);
+
+ sqlBuilder.append("\n)");
+
+ // Extract engine from properties
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
+ appendTableEngine(notNullProperties, sqlBuilder, onCluster);
+
+ appendOrderBy(sortOrders, sqlBuilder, engine);
+
+ appendPartitionClause(partitioning, sqlBuilder, engine);
+
+ // Add table comment if specified
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+ }
+
+ // Add setting clause if specified, clickhouse only supports predefine
settings
+ appendTableProperties(notNullProperties, sqlBuilder);
+
+ // Return the generated SQL statement
+ String result = sqlBuilder.toString();
+
+ LOG.info("Generated create table:{} sql: {}", tableName, result);
+ return result;
+ }
+
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(CLUSTER_NAME);
+ String onClusterValue = properties.get(ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ &&
Boolean.TRUE.equals(BooleanUtils.toBooleanObject(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER `%s`
(\n".formatted(quoteIdentifier(tableName), clusterName));
+ } else {
+ sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+ }
+
+ return onCluster;
+ }
+
+ private static void appendTableProperties(
+ Map<String, String> properties, StringBuilder sqlBuilder) {
+ if (MapUtils.isEmpty(properties)) {
+ return;
+ }
+
+ Map<String, String> settingMap =
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(SETTINGS_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if (MapUtils.isEmpty(settingMap)) {
+ return;
+ }
+
+ String settings =
+ settingMap.entrySet().stream()
+ .map(
+ entry ->
+ entry.getKey().substring(SETTINGS_PREFIX.length()) + " = "
+ entry.getValue())
+ .collect(Collectors.joining(",\n ", " \n SETTINGS ", ""));
+ sqlBuilder.append(settings);
+ }
+
+ private static void appendOrderBy(
+ SortOrder[] sortOrders,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ // ClickHouse requires ORDER BY clause for some engines, and currently
only mergeTree family
+ // requires ORDER BY clause.
+ boolean requireOrderBy = engine.isRequireOrderBy();
+ if (!requireOrderBy) {
+ if (ArrayUtils.isNotEmpty(sortOrders)) {
+ throw new UnsupportedOperationException(
+ "ORDER BY clause is not supported for engine: " +
engine.getValue());
+ }
+
+ // No need to add order by clause
+ return;
+ }
+
+ if (ArrayUtils.isEmpty(sortOrders)) {
+ throw new IllegalArgumentException(
+ "ORDER BY clause is required for engine: " + engine.getValue());
+ }
+
+ if (sortOrders.length > 1) {
+ throw new UnsupportedOperationException(
+ "Currently ClickHouse does not support sortOrders with more than 1
element");
+ }
+
+ NullOrdering nullOrdering = sortOrders[0].nullOrdering();
+ SortDirection sortDirection = sortOrders[0].direction();
+ if (nullOrdering != null && sortDirection != null) {
+ // ClickHouse does not support NULLS FIRST/LAST now.
+ LOG.warn(
+ "ClickHouse currently does not support nullOrdering: {}, and will
ignore it",
+ nullOrdering);
+ }
+
+ sqlBuilder.append("\n ORDER BY
`%s`\n".formatted(sortOrders[0].expression()));
Review Comment:
`ORDER BY` is built from `sortOrders[0].expression()` and then wrapped in
backticks. `SortOrder.expression()` is an `Expression` and may not be a simple
column identifier, which can produce invalid SQL (and defeats quoting).
Validate it’s a single-column `NamedReference` and quote that identifier, or
explicitly support/serialize expressions safely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]