This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 8919ef357d [#9820] feat(catalogs): Support cluster mode, distributed
and partitioning table for ClickHouse table (#9858)
8919ef357d is described below
commit 8919ef357d987f215a86d4faed4a696f03bc3d19
Author: Qi Yu <[email protected]>
AuthorDate: Wed Feb 11 09:33:40 2026 +0800
[#9820] feat(catalogs): Support cluster mode, distributed and partitioning
table for ClickHouse table (#9858)
### What changes were proposed in this pull request?
This pull request introduces several enhancements and fixes to the
ClickHouse catalog integration, focusing on improved support for cluster
operations, table partitioning, and index types. The changes add support
for ON CLUSTER clauses, enable partitioning for MergeTree engines,
introduce data skipping index types, and refine engine and property
handling for distributed tables.
### Why are the changes needed?
To support the Clickhouse catalog fully.
Fix: #9820
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
UTs
---------
Co-authored-by: flaming-archer <[email protected]>
---
.../org/apache/gravitino/rel/indexes/Index.java | 10 +-
.../catalog/clickhouse/ClickHouseConfig.java | 2 +-
.../catalog/clickhouse/ClickHouseConstants.java | 12 +-
.../ClickHouseTablePropertiesMetadata.java | 27 +-
.../operations/ClickHouseDatabaseOperations.java | 20 ++
.../operations/ClickHouseTableOperations.java | 276 +++++++++++++++++++--
.../operations/ClickHouseTableSqlUtils.java | 239 ++++++++++++++++++
.../TestClickHouseDatabaseOperations.java | 11 +
.../operations/TestClickHouseTableOperations.java | 108 +++++++-
.../TestClickHouseTableOperationsCluster.java | 211 ++++++++++++++++
.../TestClickHouseTableOperationsIndexParsing.java | 56 +++++
.../TestClickHouseTableOperationsPartitioning.java | 66 +++++
12 files changed, 1000 insertions(+), 38 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
index fd3beb0e67..925ac4cb72 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
@@ -111,6 +111,14 @@ public interface Index {
/** IVF_HNSW_FLAT */
IVF_HNSW_SQ,
/** IVF_HNSW_PQ */
- IVF_HNSW_PQ;
+ IVF_HNSW_PQ,
+
+ // The following index types are data skipping indexes in ClickHouse,
ngrambf_v1 and tokenbf_v1
+ // Will be supported later.
+ /** minmax data skipping index */
+ DATA_SKIPPING_MINMAX,
+
+ /** Bloom filter data skipping index */
+ DATA_SKIPPING_BLOOM_FILTER,
}
}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
index 4f456775b7..7ef9d58d37 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConfig.java
@@ -32,7 +32,7 @@ public class ClickHouseConfig {
// Constants part
public static final ConfigEntry<String> CK_CLUSTER_NAME =
- new ConfigBuilder(ClusterConstants.NAME)
+ new ConfigBuilder(ClusterConstants.CLUSTER_NAME)
.doc("Cluster name for ClickHouse distributed tables")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
index 19b85a0dd0..38428e82aa 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseConstants.java
@@ -35,7 +35,7 @@ public class ClickHouseConstants {
private ClusterConstants() {}
// Name of the clickhouse cluster
- public static final String NAME = "cluster-name";
+ public static final String CLUSTER_NAME = "cluster-name";
// Whether to use 'ON CLUSTER' clause when creating tables
public static final String ON_CLUSTER = "on-cluster";
}
@@ -48,4 +48,14 @@ public class ClickHouseConstants {
public static final String ENGINE_UPPER = "ENGINE";
public static final String SETTINGS_PREFIX = "settings.";
}
+
+ public static final class IndexConstants {
+ private IndexConstants() {}
+
+ // The name of the data skipping index type for minmax index in clickhouse.
+ public static final String DATA_SKIPPING_MINMAX_VALUE = "minmax";
+
+ // The name of the data skipping index type for bloom filter index in
clickhouse.
+ public static final String DATA_SKIPPING_BLOOM_FILTER = "bloom_filter";
+ }
}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
index f7330c6952..41f96a0884 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/ClickHouseTablePropertiesMetadata.java
@@ -54,7 +54,7 @@ public class ClickHouseTablePropertiesMetadata extends
JdbcTablePropertiesMetada
// The following two are for cluster tables
public static final PropertyEntry<String> CLUSTER_NAME_PROPERTY_ENTRY =
stringOptionalPropertyEntry(
- ClusterConstants.NAME,
+ ClusterConstants.CLUSTER_NAME,
"The cluster name for DDL operations, for example, creating on
cluster tables",
false,
"",
@@ -121,12 +121,12 @@ public class ClickHouseTablePropertiesMetadata extends
JdbcTablePropertiesMetada
/** refer https://clickhouse.com/docs/en/engines/table-engines */
public enum ENGINE {
// MergeTree
- MERGETREE("MergeTree", true),
- REPLACINGMERGETREE("ReplacingMergeTree", true),
- SUMMINGMERGETREE("SummingMergeTree", true),
- AGGREGATINGMERGETREE("AggregatingMergeTree", true),
- COLLAPSINGMERGETREE("CollapsingMergeTree", true),
- VERSIONEDCOLLAPSINGMERGETREE("VersionedCollapsingMergeTree", true),
+ MERGETREE("MergeTree", true, true),
+ REPLACINGMERGETREE("ReplacingMergeTree", true, true),
+ SUMMINGMERGETREE("SummingMergeTree", true, true),
+ AGGREGATINGMERGETREE("AggregatingMergeTree", true, true),
+ COLLAPSINGMERGETREE("CollapsingMergeTree", true, true),
+ VERSIONEDCOLLAPSINGMERGETREE("VersionedCollapsingMergeTree", true, true),
GRAPHITEMERGETREE("GraphiteMergeTree"),
// Log
@@ -165,15 +165,24 @@ public class ClickHouseTablePropertiesMetadata extends
JdbcTablePropertiesMetada
private final String value;
private final boolean requireOrderBy;
+ private final boolean acceptPartition;
ENGINE(String value) {
this.value = value;
this.requireOrderBy = false;
+ this.acceptPartition = false;
}
ENGINE(String value, boolean requireOrderBy) {
this.value = value;
this.requireOrderBy = requireOrderBy;
+ this.acceptPartition = false;
+ }
+
+ ENGINE(String value, boolean requireOrderBy, boolean acceptPartition) {
+ this.value = value;
+ this.requireOrderBy = requireOrderBy;
+ this.acceptPartition = acceptPartition;
}
public String getValue() {
@@ -184,6 +193,10 @@ public class ClickHouseTablePropertiesMetadata extends
JdbcTablePropertiesMetada
return requireOrderBy;
}
+ public boolean acceptPartition() {
+ return acceptPartition;
+ }
+
public static ENGINE fromString(String engineText) {
for (ENGINE engine : ENGINE.values()) {
if (engine.value.equalsIgnoreCase(engineText)) {
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
index a3902ba1f6..9cdcdcd58c 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
public class ClickHouseDatabaseOperations extends JdbcDatabaseOperations {
@@ -85,6 +87,11 @@ public class ClickHouseDatabaseOperations extends
JdbcDatabaseOperations {
StringBuilder createDatabaseSql =
new StringBuilder(String.format("CREATE DATABASE `%s`", databaseName));
+ if (onCluster(properties)) {
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ createDatabaseSql.append(String.format(" ON CLUSTER `%s`", clusterName));
+ }
+
if (StringUtils.isNotEmpty(originComment)) {
createDatabaseSql.append(String.format(" COMMENT '%s'", originComment));
}
@@ -92,4 +99,17 @@ public class ClickHouseDatabaseOperations extends
JdbcDatabaseOperations {
LOG.info("Generated create database:{} sql: {}", databaseName,
createDatabaseSql);
return createDatabaseSql.toString();
}
+
+ private boolean onCluster(Map<String, String> dbProperties) {
+ if (MapUtils.isEmpty(dbProperties)) {
+ return false;
+ }
+
+ String clusterName = dbProperties.get(ClusterConstants.CLUSTER_NAME);
+ if (StringUtils.isBlank(clusterName)) {
+ return false;
+ }
+
+ return
Boolean.parseBoolean(dbProperties.getOrDefault(ClusterConstants.ON_CLUSTER,
"false"));
+ }
}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
index 0fc69d8279..e364c89505 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.catalog.clickhouse.operations;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.IndexConstants.DATA_SKIPPING_BLOOM_FILTER;
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.IndexConstants.DATA_SKIPPING_MINMAX_VALUE;
import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
@@ -43,8 +45,11 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.StringIdentifier;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.DistributedTableConstants;
import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.TableConstants;
import
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE;
import org.apache.gravitino.catalog.jdbc.JdbcColumn;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
@@ -52,12 +57,14 @@ import
org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.NullOrdering;
import org.apache.gravitino.rel.expressions.sorts.SortDirection;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
@@ -98,6 +105,7 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
indexes.add(
Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][]
{{columnName}}));
}
+ indexes.addAll(getSecondaryIndexes(connection, databaseName, tableName));
return indexes;
} catch (SQLException e) {
throw exceptionMapper.toGravitinoException(e);
@@ -128,32 +136,33 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
Index[] indexes,
SortOrder[] sortOrders) {
- // These two are not yet supported in Gravitino now and will be supported
in the future.
- if (ArrayUtils.isNotEmpty(partitioning)) {
- throw new UnsupportedOperationException(
- "Currently we do not support Partitioning in clickhouse");
- }
Preconditions.checkArgument(
Distributions.NONE.equals(distribution), "ClickHouse does not support
distribution");
- // First build the CREATE TABLE statement
StringBuilder sqlBuilder = new StringBuilder();
- sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+
+ Map<String, String> notNullProperties =
+ MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
+
+ // Add Create table clause
+ appendCreateTableClause(notNullProperties, sqlBuilder, tableName);
// Add columns
buildColumnsDefinition(columns, sqlBuilder);
- // Index definition, we only support primary index now, secondary index
will be supported in
- // the future
+ // Index definition
appendIndexesSql(indexes, sqlBuilder);
sqlBuilder.append("\n)");
// Extract engine from properties
- ClickHouseTablePropertiesMetadata.ENGINE engine =
appendTableEngine(properties, sqlBuilder);
+ ClickHouseTablePropertiesMetadata.ENGINE engine =
+ appendTableEngine(notNullProperties, sqlBuilder, columns);
appendOrderBy(sortOrders, sqlBuilder, engine);
+ appendPartitionClause(partitioning, sqlBuilder, engine);
+
// Add table comment if specified
if (StringUtils.isNotEmpty(comment)) {
String escapedComment = comment.replace("'", "''");
@@ -161,7 +170,7 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
}
// Add setting clause if specified, clickhouse only supports predefine
settings
- appendTableProperties(properties, sqlBuilder);
+ appendTableProperties(notNullProperties, sqlBuilder);
// Return the generated SQL statement
String result = sqlBuilder.toString();
@@ -170,6 +179,35 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
return result;
}
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String onClusterValue = properties.get(ClusterConstants.ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ && Boolean.TRUE.equals(Boolean.parseBoolean(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER %s (\n"
+ .formatted(quoteIdentifier(tableName),
quoteIdentifier(clusterName)));
+ } else {
+ sqlBuilder.append("CREATE TABLE %s
(\n".formatted(quoteIdentifier(tableName)));
+ }
+
+ return onCluster;
+ }
+
private static void appendTableProperties(
Map<String, String> properties, StringBuilder sqlBuilder) {
if (MapUtils.isEmpty(properties)) {
@@ -228,18 +266,119 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, JdbcColumn[]
columns) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ handleDistributeTable(properties, sqlBuilder, columns);
+ return engine;
+ }
+
sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
return engine;
}
+ private void handleDistributeTable(
+ Map<String, String> properties, StringBuilder sqlBuilder, JdbcColumn[]
columns) {
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ List<String> shardingColumns =
ClickHouseTableSqlUtils.extractShardingKeyColumns(shardingKey);
+ if (CollectionUtils.isNotEmpty(shardingColumns)) {
+ for (String columnName : shardingColumns) {
+ JdbcColumn shardingColumn = findColumn(columns, columnName);
+ Preconditions.checkArgument(
+ shardingColumn != null,
+ "Sharding key column %s must be defined in the table",
+ columnName);
+ }
+ }
+
+ String sanitizedShardingKey =
ClickHouseTableSqlUtils.formatShardingKey(shardingKey);
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ sanitizedShardingKey));
+ }
+
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+ Arrays.stream(partitioning)
+ .map(ClickHouseTableSqlUtils::toPartitionExpression)
+ .collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private JdbcColumn findColumn(JdbcColumn[] columns, String columnName) {
+ if (ArrayUtils.isEmpty(columns)) {
+ return null;
+ }
+
+ return Arrays.stream(columns)
+ .filter(column -> StringUtils.equals(column.name(), columnName))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
+ return quoteIdentifier(fieldName);
+ }
+
private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder
sqlBuilder) {
for (int i = 0; i < columns.length; i++) {
JdbcColumn column = columns[i];
@@ -254,9 +393,9 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
}
/**
- * ClickHouse only supports primary key now, and some secondary index will
be supported in future
+ * ClickHouse supports primary key and data skipping indexes.
*
- * <p>This method will not check the validity of the indexes. For
clickhouse, the primary key must
+ * <p>This method will not check the validity of the indexes. For
ClickHouse, the primary key must
* be a subset of the order by columns. We will leave the underlying
clickhouse to validate it.
*/
private void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
@@ -278,6 +417,25 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
// fieldStr already quoted in getIndexFieldStr
sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
break;
+ case DATA_SKIPPING_MINMAX:
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ // The GRANULARITY value is always 1 here currently as we can't set
it by Index: there is
+ // no field for it.
+ // TODO(yuqi) add a properties field to Index to support user
defined GRANULARITY value.
+ sqlBuilder.append(
+ " INDEX %s %s TYPE minmax GRANULARITY 1"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
+ case DATA_SKIPPING_BLOOM_FILTER:
+ // The GRANULARITY value is always 3 here currently.
+ // TODO(yuqi) add a properties field to Index to support user
defined GRANULARITY value.
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ sqlBuilder.append(
+ " INDEX %s %s TYPE bloom_filter GRANULARITY 3"
+ .formatted(quoteIdentifier(index.name()), fieldStr));
+ break;
default:
throw new IllegalArgumentException(
"Gravitino Clickhouse doesn't support index : " + index.type());
@@ -333,6 +491,34 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
}
}
+ @Override
+ protected Transform[] getTablePartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement(
+ "SELECT partition_key FROM system.tables WHERE database = ? AND
name = ?")) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ String partitionKey = resultSet.getString("partition_key");
+ try {
+ return parsePartitioning(partitionKey);
+ } catch (IllegalArgumentException | UnsupportedOperationException e)
{
+ LOG.warn(
+ "Skip unsupported partition expression {} for {}.{}",
+ partitionKey,
+ databaseName,
+ tableName);
+ return Transforms.EMPTY_TRANSFORM;
+ }
+ }
+ }
+ }
+
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
protected ResultSet getTables(Connection connection) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String catalogName = connection.getCatalog();
@@ -683,6 +869,66 @@ public class ClickHouseTableOperations extends
JdbcTableOperations {
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ return ClickHouseTableSqlUtils.parsePartitioning(partitionKey);
+ }
+
+ @VisibleForTesting
+ String[][] parseIndexFields(String expression) {
+ return ClickHouseTableSqlUtils.parseIndexFields(expression);
+ }
+
+ private List<Index> getSecondaryIndexes(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ List<Index> secondaryIndexes = new ArrayList<>();
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(
+ "SELECT name, type, expr FROM system.data_skipping_indices "
+ + "WHERE database = ? AND table = ? ORDER BY name")) {
+ preparedStatement.setString(1, databaseName);
+ preparedStatement.setString(2, tableName);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ String type = resultSet.getString("type");
+ String expression = resultSet.getString("expr");
+ try {
+ String[][] fields = parseIndexFields(expression);
+ if (ArrayUtils.isEmpty(fields)) {
+ continue;
+ }
+ secondaryIndexes.add(Indexes.of(getClickHouseIndexType(type),
name, fields));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported data skipping index {} for {}.{} with
expression {}",
+ name,
+ databaseName,
+ tableName,
+ expression);
+ }
+ }
+ }
+ }
+
+ return secondaryIndexes;
+ }
+
+ private Index.IndexType getClickHouseIndexType(String rawType) {
+ if (StringUtils.isBlank(rawType)) {
+ return Index.IndexType.DATA_SKIPPING_MINMAX;
+ }
+
+ switch (rawType) {
+ case DATA_SKIPPING_MINMAX_VALUE:
+ return Index.IndexType.DATA_SKIPPING_MINMAX;
+ case DATA_SKIPPING_BLOOM_FILTER:
+ return Index.IndexType.DATA_SKIPPING_BLOOM_FILTER;
+ default:
+ throw new IllegalArgumentException("Unsupported data skipping index
type: " + rawType);
+ }
+ }
+
private StringBuilder appendColumnDefinition(JdbcColumn column,
StringBuilder sqlBuilder) {
// Add Nullable data type
String dataType = typeConverter.fromGravitino(column.dataType());
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableSqlUtils.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableSqlUtils.java
new file mode 100644
index 0000000000..4f21d69cd3
--- /dev/null
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableSqlUtils.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+
+final class ClickHouseTableSqlUtils {
+
+ private static final Pattern TO_DATE_PATTERN =
+ Pattern.compile("toDate\\((.+)\\)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TO_YEAR_PATTERN =
+ Pattern.compile("toYear\\((.+)\\)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TO_MONTH_PATTERN =
+ Pattern.compile("toYYYYMM\\((.+)\\)", Pattern.CASE_INSENSITIVE);
+ private static final Pattern FUNCTION_WRAPPER_PATTERN =
+ Pattern.compile("^\\s*([A-Za-z0-9_]+)\\((.*)\\)\\s*$");
+
+ private ClickHouseTableSqlUtils() {}
+
+ static Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = normalizePartitionKey(partitionKey);
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String expression = StringUtils.trim(part);
+ if (StringUtils.isBlank(expression)) {
+ continue;
+ }
+ transforms.add(parsePartitionExpression(expression, partitionKey));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ static String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName = ((NamedReference)
transform.arguments()[0]).fieldName()[0];
+ return quoteIdentifier(fieldName);
+ }
+
+ static List<String> extractShardingKeyColumns(String shardingKey) {
+ String normalized = normalizeIndexExpression(shardingKey);
+ if (StringUtils.isBlank(normalized)) {
+ return Collections.emptyList();
+ }
+
+ String[] parts = normalized.split(",");
+ List<String> columns = new ArrayList<>();
+ for (String part : parts) {
+ String column = normalizeIdentifier(part);
+ Preconditions.checkArgument(
+ isSimpleIdentifier(column), "Sharding key contains unsupported
expression: %s", part);
+ columns.add(column);
+ }
+ return ImmutableList.copyOf(columns);
+ }
+
+ static String formatShardingKey(String shardingKey) {
+ String trimmed = StringUtils.trim(shardingKey);
+ if (StringUtils.isBlank(trimmed)) {
+ return trimmed;
+ }
+
+ String normalized = normalizeIdentifier(trimmed);
+ if (isSimpleIdentifier(normalized)) {
+ return quoteIdentifier(normalized);
+ }
+ return trimmed;
+ }
+
+ static String[][] parseIndexFields(String expression) {
+ if (StringUtils.isBlank(expression)) {
+ return new String[0][];
+ }
+
+ String normalized = normalizeIndexExpression(expression);
+ if (StringUtils.isBlank(normalized)) {
+ return new String[0][];
+ }
+
+ String[] parts = normalized.split(",");
+ List<String[]> fields = new ArrayList<>();
+ for (String part : parts) {
+ String col = normalizeIdentifier(part);
+ Preconditions.checkArgument(
+ isSimpleIdentifier(col), "Unsupported index expression: " +
expression);
+ fields.add(new String[] {col});
+ }
+
+ return fields.toArray(new String[0][]);
+ }
+
+ static String normalizeIndexExpression(String expression) {
+ String trimmed = expression.trim();
+
+ boolean stripped = true;
+ String current = trimmed;
+ while (stripped) {
+ stripped = false;
+ Matcher matcher = FUNCTION_WRAPPER_PATTERN.matcher(current);
+ if (matcher.matches()) {
+ current = matcher.group(2).trim();
+ stripped = true;
+ }
+ }
+
+ if (StringUtils.startsWithIgnoreCase(current, "tuple(") &&
StringUtils.endsWith(current, ")")) {
+ current = current.substring("tuple(".length(), current.length() -
1).trim();
+ } else if (StringUtils.equalsIgnoreCase(current, "tuple()")) {
+ current = "";
+ }
+
+ return current;
+ }
+
+ static String normalizeIdentifier(String identifier) {
+ String col = StringUtils.trim(identifier);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`") &&
col.length() >= 2) {
+ return col.substring(1, col.length() - 1);
+ }
+ return col;
+ }
+
+ static boolean isSimpleIdentifier(String identifier) {
+ return StringUtils.isNotBlank(identifier)
+ && !StringUtils.containsAny(identifier, "(", ")", " ", "%", "+", "-",
"*", "/");
+ }
+
+ private static boolean isStrictIdentifier(String identifier) {
+ return StringUtils.isNotBlank(identifier) &&
identifier.matches("^[a-zA-Z_][a-zA-Z0-9_]*$");
+ }
+
+ private static Transform parsePartitionExpression(
+ String expression, String originalPartitionKey) {
+ String trimmedExpression = StringUtils.trim(expression);
+
+ Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+ if (toYearMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYearMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.year(identifier);
+ }
+
+ Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+ if (toYYYYMMMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.month(identifier);
+ }
+
+ Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+ if (toDateMatcher.matches()) {
+ String identifier = normalizeIdentifier(toDateMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.day(identifier);
+ }
+
+ if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+ throw new UnsupportedOperationException(
+ "Currently Gravitino only supports toYear, toYYYYMM, toDate
partition expressions, but got: "
+ + trimmedExpression);
+ }
+
+ String identifier = normalizeIdentifier(trimmedExpression);
+ Preconditions.checkArgument(
+ isStrictIdentifier(identifier),
+ "Only simple identifier is supported for partition expression, but
got: "
+ + originalPartitionKey);
+ return Transforms.identity(identifier);
+ }
+
+ private static String normalizePartitionKey(String partitionKey) {
+ String trimmedKey = partitionKey.trim();
+ if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+ return "";
+ }
+ if (StringUtils.startsWithIgnoreCase(trimmedKey, "tuple(")
+ && StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring("tuple(".length(), trimmedKey.length() -
1).trim();
+ }
+ if (StringUtils.startsWith(trimmedKey, "(") &&
StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring(1, trimmedKey.length() - 1).trim();
+ }
+ return trimmedKey;
+ }
+
+ private static String quoteIdentifier(String identifier) {
+ return String.format("`%s`", identifier);
+ }
+}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
index b4ee51d703..44e7b7ff61 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.catalog.clickhouse.ClickHouseConfig;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -60,6 +61,16 @@ public class TestClickHouseDatabaseOperations {
Assertions.assertEquals("CREATE DATABASE `db_name` COMMENT 'comment'",
sql);
}
+ @Test
+ void testGenerateCreateDatabaseSqlWithClusterEnabled() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ properties.put(ClusterConstants.ON_CLUSTER, "true");
+
+ String sql = newOps(new HashMap<>()).buildCreateSql("db_name", null,
properties);
+ Assertions.assertEquals("CREATE DATABASE `db_name` ON CLUSTER
`ck_cluster`", sql);
+ }
+
@Test
void testGenerateDropDatabaseSqlWithoutCluster() {
Map<String, String> conf = new HashMap<>();
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
index 1bc181fabf..5ebe7dc4b3 100644
---
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
@@ -50,6 +50,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Index.IndexType;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.gravitino.rel.types.Decimal;
import org.apache.gravitino.rel.types.Type;
@@ -899,19 +900,18 @@ public class TestClickHouseTableOperations extends
TestClickHouse {
new String[][] {{"c1"}})
};
- // partitioning not supported
- Assertions.assertThrows(
- UnsupportedOperationException.class,
- () ->
- ops.buildCreateSql(
- "t1",
- new JdbcColumn[] {col},
- null,
- new HashMap<>(),
- new Transform[] {Transforms.identity("p")},
- Distributions.NONE,
- indexes,
- ClickHouseUtils.getSortOrders("c1")));
+ Map<String, String> propsWithPartition = new HashMap<>();
+ String partitionSql =
+ ops.buildCreateSql(
+ "t1",
+ new JdbcColumn[] {col},
+ null,
+ propsWithPartition,
+ new Transform[] {Transforms.identity("c1")},
+ Distributions.NONE,
+ indexes,
+ ClickHouseUtils.getSortOrders("c1"));
+ Assertions.assertTrue(partitionSql.contains("PARTITION BY `c1`"));
// distribution not NONE
Assertions.assertThrows(
@@ -961,6 +961,18 @@ public class TestClickHouseTableOperations extends
TestClickHouse {
Map<String, String> logEngineProps = new HashMap<>();
logEngineProps.put(
TableConstants.ENGINE_UPPER,
ClickHouseTablePropertiesMetadata.ENGINE.LOG.getValue());
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ ops.buildCreateSql(
+ "t1",
+ new JdbcColumn[] {col},
+ null,
+ logEngineProps,
+ new Transform[] {Transforms.identity("c1")},
+ Distributions.NONE,
+ indexes,
+ null));
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
@@ -994,6 +1006,76 @@ public class TestClickHouseTableOperations extends
TestClickHouse {
Assertions.assertTrue(sql.contains("COMMENT 'co''mment'"));
}
+ @Test
+ void testGenerateCreateTableSqlWithSecondaryIndexAndPartition() {
+ TestableClickHouseTableOperations ops = new
TestableClickHouseTableOperations();
+ ops.initialize(
+ null,
+ new ClickHouseExceptionConverter(),
+ new ClickHouseTypeConverter(),
+ new ClickHouseColumnDefaultValueConverter(),
+ new HashMap<>());
+
+ JdbcColumn[] cols =
+ new JdbcColumn[] {
+ JdbcColumn.builder()
+ .withName("c1")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build(),
+ JdbcColumn.builder()
+ .withName("c2")
+ .withType(Types.StringType.get())
+ .withNullable(true)
+ .build(),
+ JdbcColumn.builder()
+ .withName("c3")
+ .withType(Types.LongType.get())
+ .withNullable(true)
+ .build(),
+ };
+
+ Index[] indexes =
+ new Index[] {
+ Indexes.primary(Indexes.DEFAULT_PRIMARY_KEY_NAME, new String[][]
{{"c1"}}),
+ Indexes.of(IndexType.DATA_SKIPPING_MINMAX, "idx_c2", new String[][]
{{"c2"}}),
+ Indexes.of(IndexType.DATA_SKIPPING_BLOOM_FILTER, "idx_c3", new
String[][] {{"c3"}}),
+ };
+
+ String sql =
+ ops.buildCreateSql(
+ "t_idx",
+ cols,
+ "comment",
+ new HashMap<>(),
+ new Transform[] {Transforms.identity("c1")},
+ Distributions.NONE,
+ indexes,
+ ClickHouseUtils.getSortOrders("c1"));
+
+ Assertions.assertTrue(sql.contains("PARTITION BY `c1`"));
+ Assertions.assertTrue(sql.contains("INDEX `idx_c2` `c2` TYPE minmax
GRANULARITY 1"));
+ Assertions.assertTrue(sql.contains("INDEX `idx_c3` `c3` TYPE bloom_filter
GRANULARITY 3"));
+ }
+
+ @Test
+ void testParsePartitioningAndIndexExpressions() {
+ TestableClickHouseTableOperations ops = new
TestableClickHouseTableOperations();
+
+ Transform[] transforms = ops.parsePartitioning("tuple(`p1`, `p2`)");
+ Assertions.assertEquals(2, transforms.length);
+ Assertions.assertArrayEquals(
+ new String[] {"p1"}, ((NamedReference)
transforms[0].arguments()[0]).fieldName());
+ Assertions.assertArrayEquals(
+ new String[] {"p2"}, ((NamedReference)
transforms[1].arguments()[0]).fieldName());
+
+ String[][] fields = ops.parseIndexFields("tuple(`c2`, `c3`)");
+ Assertions.assertArrayEquals(new String[][] {{"c2"}, {"c3"}}, fields);
+
+ String[][] bloomFields = ops.parseIndexFields("bloom_filter(`c4`)");
+ Assertions.assertArrayEquals(new String[][] {{"c4"}}, bloomFields);
+ }
+
private static final class TestableClickHouseTableOperations extends
ClickHouseTableOperations {
String buildCreateSql(
String tableName,
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
new file mode 100644
index 0000000000..b5088ad951
--- /dev/null
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.DistributedTableConstants;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefaultValueConverter;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestClickHouseTableOperationsCluster {
+
+ private TestableClickHouseTableOperations ops;
+
+ @BeforeEach
+ void setUp() {
+ ops = new TestableClickHouseTableOperations();
+ ops.initialize(
+ null,
+ new ClickHouseExceptionConverter(),
+ new ClickHouseTypeConverter(),
+ new ClickHouseColumnDefaultValueConverter(),
+ new HashMap<>());
+ }
+
+ @Test
+ void testGenerateCreateTableSqlWithClusterDistributedEngine() {
+ JdbcColumn[] columns =
+ new JdbcColumn[] {
+ JdbcColumn.builder()
+ .withName("user_id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build()
+ };
+
+ Map<String, String> props = new HashMap<>();
+ props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ props.put(ClusterConstants.ON_CLUSTER, "true");
+ props.put(CLICKHOUSE_ENGINE_KEY, "Distributed");
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.put(DistributedTableConstants.SHARDING_KEY, "`user_id`");
+
+ String sql =
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE, new
Index[0], null);
+
+ Assertions.assertTrue(sql.contains("CREATE TABLE `tbl` ON CLUSTER
`ck_cluster`"));
+ Assertions.assertTrue(
+ sql.contains("ENGINE =
Distributed(`ck_cluster`,`remote_db`,`remote_table`,`user_id`)"));
+
+ props.remove(DistributedTableConstants.REMOTE_DATABASE);
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE,
new Index[0], null));
+
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.remove(DistributedTableConstants.REMOTE_TABLE);
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE,
new Index[0], null));
+
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.remove(DistributedTableConstants.SHARDING_KEY);
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE,
new Index[0], null));
+
+ props.put(DistributedTableConstants.SHARDING_KEY, "cityHash64(`user_id`)");
+ String functionSql =
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE, new
Index[0], null);
+ Assertions.assertTrue(
+ functionSql.contains(
+ "ENGINE =
Distributed(`ck_cluster`,`remote_db`,`remote_table`,cityHash64(`user_id`)"));
+ }
+
+ @Test
+ void testShardingKeyValidation() {
+ Map<String, String> props = new HashMap<>();
+ props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ props.put(ClusterConstants.ON_CLUSTER, "true");
+ props.put(CLICKHOUSE_ENGINE_KEY, "Distributed");
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.put(DistributedTableConstants.SHARDING_KEY, "user");
+
+ JdbcColumn[] nullableColumn =
+ new JdbcColumn[] {
+ JdbcColumn.builder()
+ .withName("user_id")
+ .withType(Types.IntegerType.get())
+ .withNullable(true)
+ .build()
+ };
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.buildCreateSql(
+ "tbl",
+ nullableColumn,
+ "comment",
+ props,
+ null,
+ Distributions.NONE,
+ new Index[0],
+ null));
+ }
+
+ @Test
+ void testFunctionOnlyShardingKeyAllowed() {
+ Map<String, String> props = new HashMap<>();
+ props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ props.put(ClusterConstants.ON_CLUSTER, "true");
+ props.put(CLICKHOUSE_ENGINE_KEY, "Distributed");
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.put(DistributedTableConstants.SHARDING_KEY, "rand()");
+
+ JdbcColumn[] columns =
+ new JdbcColumn[] {
+ JdbcColumn.builder()
+ .withName("user_id")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build()
+ };
+
+ String sql =
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE, new
Index[0], null);
+ Assertions.assertTrue(
+ sql.contains("ENGINE =
Distributed(`ck_cluster`,`remote_db`,`remote_table`,rand())"));
+ }
+
+ @Test
+ void testGenerateCreateTableSqlWithDistributedEngineWithoutOnCluster() {
+ JdbcColumn[] columns =
+ new JdbcColumn[] {
+
JdbcColumn.builder().withName("user_id").withType(Types.IntegerType.get()).build()
+ };
+
+ Map<String, String> props = new HashMap<>();
+ props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ props.put(CLICKHOUSE_ENGINE_KEY, "Distributed");
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.put(DistributedTableConstants.SHARDING_KEY, "user_id");
+
+ String sql =
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE, new
Index[0], null);
+
+ Assertions.assertTrue(sql.startsWith("CREATE TABLE `tbl` ("));
+ Assertions.assertTrue(
+ sql.contains("ENGINE =
Distributed(`ck_cluster`,`remote_db`,`remote_table`,`user_id`)"));
+ }
+
+ private static class TestableClickHouseTableOperations extends
ClickHouseTableOperations {
+ String buildCreateSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ SortOrder[] sortOrders) {
+ return generateCreateTableSql(
+ tableName, columns, comment, properties, partitioning, distribution,
indexes, sortOrders);
+ }
+ }
+}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsIndexParsing.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsIndexParsing.java
new file mode 100644
index 0000000000..83ad206a7f
--- /dev/null
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsIndexParsing.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestClickHouseTableOperationsIndexParsing {
+
+ private final ClickHouseTableOperations operations = new
ClickHouseTableOperations();
+
+ @Test
+ public void testParseSimpleAndTupleIndexExpressions() {
+ String[][] single = operations.parseIndexFields("col_1");
+ Assertions.assertArrayEquals(new String[][] {{"col_1"}}, single);
+
+ String[][] tuple = operations.parseIndexFields("tuple(`a`, b)");
+ Assertions.assertArrayEquals(new String[][] {{"a"}, {"b"}}, tuple);
+ }
+
+ @Test
+ public void testParseFunctionWrappedExpression() {
+ String[][] bloom =
operations.parseIndexFields("bloom_filter(cityHash64(user_id))");
+ Assertions.assertArrayEquals(new String[][] {{"user_id"}}, bloom);
+
+ String[][] nested =
operations.parseIndexFields("minmax(lower(`tenant_id`))");
+ Assertions.assertArrayEquals(new String[][] {{"tenant_id"}}, nested);
+ }
+
+ @Test
+ public void testParseEmptyIndexExpression() {
+ Assertions.assertEquals(0, operations.parseIndexFields(" ").length);
+ Assertions.assertEquals(0, operations.parseIndexFields("tuple()").length);
+ }
+
+ @Test
+ public void testUnsupportedExpression() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
operations.parseIndexFields("cityHash64(id) % 16"));
+ }
+}
diff --git
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsPartitioning.java
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsPartitioning.java
new file mode 100644
index 0000000000..24cf0c574a
--- /dev/null
+++
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsPartitioning.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestClickHouseTableOperationsPartitioning {
+
+ private final ClickHouseTableOperations operations = new
ClickHouseTableOperations();
+
+ @Test
+ public void testParseSupportedPartitionExpressions() {
+ Transform[] dayPartitions =
operations.parsePartitioning("toDate(event_time)");
+ Assertions.assertEquals(1, dayPartitions.length);
+ assertSingleFieldTransform(dayPartitions[0], Transforms.NAME_OF_DAY,
"event_time");
+
+ Transform[] monthPartitions =
operations.parsePartitioning("toYYYYMM(event_time)");
+ Assertions.assertEquals(1, monthPartitions.length);
+ assertSingleFieldTransform(monthPartitions[0], Transforms.NAME_OF_MONTH,
"event_time");
+
+ Transform[] yearPartitions =
operations.parsePartitioning("toYear(event_time)");
+ Assertions.assertEquals(1, yearPartitions.length);
+ assertSingleFieldTransform(yearPartitions[0], Transforms.NAME_OF_YEAR,
"event_time");
+
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> operations.parsePartitioning("cityHash64(user_id) % 16"));
+
+ Transform[] identityPartitions =
operations.parsePartitioning("metric_type");
+ Assertions.assertEquals(1, identityPartitions.length);
+ assertSingleFieldTransform(identityPartitions[0],
Transforms.NAME_OF_IDENTITY, "metric_type");
+
+ Transform[] tuplePartitions = operations.parsePartitioning("(toYYYYMM(ts),
tenant_id)");
+ Assertions.assertEquals(2, tuplePartitions.length);
+ assertSingleFieldTransform(tuplePartitions[0], Transforms.NAME_OF_MONTH,
"ts");
+ assertSingleFieldTransform(tuplePartitions[1],
Transforms.NAME_OF_IDENTITY, "tenant_id");
+
+ Assertions.assertEquals(0, operations.parsePartitioning("tuple()").length);
+ Assertions.assertEquals(0, operations.parsePartitioning(" ").length);
+ }
+
+ private void assertSingleFieldTransform(
+ Transform transform, String expectedName, String expectedColumn) {
+ Assertions.assertEquals(expectedName, transform.name());
+ Assertions.assertArrayEquals(
+ new String[] {expectedColumn}, ((Transform.SingleFieldTransform)
transform).fieldName());
+ }
+}