Copilot commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2771884376
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +263,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
Review Comment:
Missing test coverage for DISTRIBUTED engine property validations. The code
validates that cluster name, remote database, remote table, and sharding key
are all present when using DISTRIBUTED engine (lines 287-298), but there are no
corresponding test cases verifying these validations. Consider adding test
cases that attempt to create a DISTRIBUTED table with each of these properties
missing, expecting IllegalArgumentExceptions with appropriate messages.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +263,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
Review Comment:
Missing test coverage for the validation that throws
IllegalArgumentException when DISTRIBUTED engine is specified without ON
CLUSTER clause. Consider adding a test case that attempts to create a
DISTRIBUTED table without setting the ON_CLUSTER property to "true", expecting
an IllegalArgumentException with the message "ENGINE = DISTRIBUTED requires ON
CLUSTER clause to be specified."
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +840,139 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = partitionKey.trim();
+ if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ if (StringUtils.startsWith(trimmedKey, "tuple(") &&
StringUtils.endsWith(trimmedKey, ")")) {
+ trimmedKey = trimmedKey.substring("tuple(".length(), trimmedKey.length()
- 1);
+ }
+
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String col = StringUtils.trim(part);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`")) {
+ col = col.substring(1, col.length() - 1);
+ }
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(col) && !StringUtils.containsAny(col, "()", "
"),
+ "Unsupported partition expression: " + partitionKey);
+ transforms.add(Transforms.identity(col));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ @VisibleForTesting
+ String[][] parseIndexFields(String expression) {
+ if (StringUtils.isBlank(expression)) {
+ return new String[0][];
+ }
+
+ String trimmed = expression.trim();
+
+ // Strip function wrappers like bloom_filter(...), minmax(...), etc.
+ boolean stripped = true;
+ while (stripped) {
+ stripped = false;
+ int open = trimmed.indexOf('(');
+ int close = trimmed.lastIndexOf(')');
+ if (open > 0 && close > open) {
+ String func = trimmed.substring(0, open).trim();
+ if (func.chars().allMatch(ch -> Character.isLetterOrDigit(ch) || ch ==
'_')) {
+ trimmed = trimmed.substring(open + 1, close).trim();
+ stripped = true;
+ }
+ }
+ }
+
+ if (StringUtils.startsWith(trimmed, "tuple(") &&
StringUtils.endsWith(trimmed, ")")) {
+ trimmed = trimmed.substring("tuple(".length(), trimmed.length() - 1);
+ }
+
+ if (StringUtils.isBlank(trimmed)) {
+ return new String[0][];
+ }
+
+ String[] parts = trimmed.split(",");
+ List<String[]> fields = new ArrayList<>();
+ for (String part : parts) {
+ String col = StringUtils.trim(part);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`")) {
+ col = col.substring(1, col.length() - 1);
+ }
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(col) && !StringUtils.containsAny(col, "()", "
"),
+ "Unsupported index expression: " + expression);
+ fields.add(new String[] {col});
+ }
Review Comment:
The index field parsing uses a simple comma split which may fail for complex
ClickHouse index expressions containing commas within function calls. Similar
to the partition expression parsing issue, this naive string splitting approach
doesn't handle nested parentheses or function arguments containing commas.
Consider implementing a more robust parser that respects function boundaries,
or add upfront validation to reject complex expressions with a clear error
message indicating only simple column references are supported.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +263,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
The sharding key is directly interpolated into the SQL string without proper
sanitization or quoting. Unlike clusterName, remoteDatabase, and remoteTable
which are backtick-quoted, the shardingKey parameter is inserted raw. This
creates a potential SQL injection vulnerability if the sharding key contains
malicious SQL syntax. While the comment on line 296 states "User must ensure
the sharding key is a trusted value", relying on user-provided values without
validation is a security risk. Consider validating that the sharding key
contains only safe characters (alphanumeric, underscore) or parse and validate
it as a column reference, similar to how partition expressions are validated.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +840,139 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = partitionKey.trim();
+ if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ if (StringUtils.startsWith(trimmedKey, "tuple(") &&
StringUtils.endsWith(trimmedKey, ")")) {
+ trimmedKey = trimmedKey.substring("tuple(".length(), trimmedKey.length()
- 1);
+ }
+
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String col = StringUtils.trim(part);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`")) {
+ col = col.substring(1, col.length() - 1);
+ }
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(col) && !StringUtils.containsAny(col, "()", "
"),
+ "Unsupported partition expression: " + partitionKey);
+ transforms.add(Transforms.identity(col));
+ }
Review Comment:
The partition expression parsing uses a simple comma split which may fail
for complex ClickHouse partition expressions that contain commas within
function calls (e.g., "toYYYYMM(date, 'UTC')"). While the current
implementation validates that parsed columns don't contain parentheses or
spaces, it doesn't prevent the split from incorrectly breaking apart function
arguments. Consider implementing a more robust parser that respects nested
parentheses and function boundaries, or add validation to reject partition
expressions containing function calls with commas upfront with a clear error
message.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
+
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.DistributedTableConstants;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefaultValueConverter;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
+import
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestClickHouseTableOperationsCluster {
+
+ private TestableClickHouseTableOperations ops;
+
+ @BeforeEach
+ void setUp() {
+ ops = new TestableClickHouseTableOperations();
+ ops.initialize(
+ null,
+ new ClickHouseExceptionConverter(),
+ new ClickHouseTypeConverter(),
+ new ClickHouseColumnDefaultValueConverter(),
+ new HashMap<>());
+ }
+
+ @Test
+ void testGenerateCreateTableSqlWithClusterDistributedEngine() {
+ JdbcColumn[] columns =
+ new JdbcColumn[] {
+ JdbcColumn.builder()
+ .withName("col_1")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build()
+ };
+
+ Map<String, String> props = new HashMap<>();
+ props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+ props.put(ClusterConstants.ON_CLUSTER, "true");
+ props.put(CLICKHOUSE_ENGINE_KEY, "Distributed");
+ props.put(DistributedTableConstants.REMOTE_DATABASE, "remote_db");
+ props.put(DistributedTableConstants.REMOTE_TABLE, "remote_table");
+ props.put(DistributedTableConstants.SHARDING_KEY, "user_id");
+
+ String sql =
+ ops.buildCreateSql(
+ "tbl", columns, "comment", props, null, Distributions.NONE, new
Index[0], null);
+
+ Assertions.assertTrue(sql.contains("CREATE TABLE `tbl` ON CLUSTER
`ck_cluster`"));
+ Assertions.assertTrue(
+ sql.contains("ENGINE =
Distributed(`ck_cluster`,`remote_db`,`remote_table`,user_id)"));
+ }
+
+ private static class TestableClickHouseTableOperations extends
ClickHouseTableOperations {
+ String buildCreateSql(
+ String tableName,
+ JdbcColumn[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitioning,
+ Distribution distribution,
+ Index[] indexes,
+ org.apache.gravitino.rel.expressions.sorts.SortOrder[] sortOrders) {
Review Comment:
Using Fully Qualified Class Name (FQN) for SortOrder parameter type.
According to coding guidelines, ALWAYS use standard imports instead of FQN
inside methods or class declarations unless absolutely necessary for collision
resolution. Add the import statement and use the simple class name instead.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -278,6 +388,25 @@ private void appendIndexesSql(Index[] indexes,
StringBuilder sqlBuilder) {
// fieldStr already quoted in getIndexFieldStr
sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")");
break;
+ case DATA_SKIPPING_MINMAX:
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(index.name()), "Data skipping index name
must not be blank");
+ // The GRANULARITY value is always 1 here currently as we can't set
it by Index: there is
+ // no field for it.
+ // TODO add a properties field to Index to support user defined
GRANULARITY value.
Review Comment:
TODO comment lacks an issue reference. According to coding guidelines, TODOs
should reference a specific issue (e.g., "TODO(#1234): ..."). This helps track
technical debt and ensures the work is properly planned. Please create an issue
for adding Index properties support for GRANULARITY and reference it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]