Copilot commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2780968853
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
return engine;
}
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
+ return quoteIdentifier(fieldName);
Review Comment:
`toPartitionExpression()` only supports `identity` transforms, but
`parsePartitioning()` produces `day/month/year` transforms for
`toDate/toYYYYMM/toYear`. This makes partitioning non-roundtrippable and
prevents creating tables with these supported ClickHouse partition expressions.
Please add handling to convert `day/month/year` transforms back into the
corresponding ClickHouse functions when generating `PARTITION BY`.
```suggestion
String transformName = transform.name();
Preconditions.checkArgument(
StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_IDENTITY)
|| StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_DAY)
|| StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_MONTH)
|| StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_YEAR),
"Unsupported partition transform: " + transformName);
Preconditions.checkArgument(
transform.arguments().length == 1
&& transform.arguments()[0] instanceof NamedReference
&& ((NamedReference)
transform.arguments()[0]).fieldName().length == 1,
"ClickHouse only supports single column partitioning");
String fieldName =
((NamedReference) transform.arguments()[0]).fieldName()[0]; //
already validated
String quotedFieldName = quoteIdentifier(fieldName);
if (StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_IDENTITY)) {
return quotedFieldName;
} else if (StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_DAY)) {
// Map day transform back to ClickHouse toDate() for partitioning
return "toDate(" + quotedFieldName + ")";
} else if (StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_MONTH)) {
// Map month transform back to ClickHouse toYYYYMM() for partitioning
return "toYYYYMM(" + quotedFieldName + ")";
} else if (StringUtils.equalsIgnoreCase(transformName,
Transforms.NAME_OF_YEAR)) {
// Map year transform back to ClickHouse toYear() for partitioning
return "toYear(" + quotedFieldName + ")";
}
// This should be unreachable due to the earlier precondition, but keep
it defensive.
throw new IllegalArgumentException("Unsupported partition transform: " +
transformName);
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = normalizePartitionKey(partitionKey);
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String expression = StringUtils.trim(part);
+ if (StringUtils.isBlank(expression)) {
+ continue;
+ }
+ transforms.add(parsePartitionExpression(expression, partitionKey));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ private Transform parsePartitionExpression(String expression, String
originalPartitionKey) {
+ String trimmedExpression = StringUtils.trim(expression);
+
+ Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+ if (toYearMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYearMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.year(identifier);
+ }
+
+ Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+ if (toYYYYMMMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.month(identifier);
+ }
+
+ Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+ if (toDateMatcher.matches()) {
+ String identifier = normalizeIdentifier(toDateMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.day(identifier);
+ }
Review Comment:
For `toYear(...)` / `toYYYYMM(...)` / `toDate(...)`, the captured inner text
is only checked for non-blank. This will accept complex expressions like
`toYear(ts + 1)` and turn them into a transform with an invalid "column" name,
which can break downstream consumers. Please validate that the extracted
identifier is a simple column reference (e.g., reuse `isSimpleIdentifier` and
trim/backtick-normalize) and reject anything else as unsupported.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
The Distributed engine clause formats `clusterName`, `remoteDatabase`, and
`remoteTable` inside backticks without escaping, and inserts `shardingKey`
unquoted as raw SQL. This can produce invalid SQL for names containing
backticks and is also a SQL-injection vector via properties. Please apply
proper identifier quoting/escaping for the name fields, and restrict/validate
the sharding key (e.g., only allow simple identifiers or a safe expression
whitelist).
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -170,6 +187,34 @@ protected String generateCreateTableSql(
return result;
}
+ /**
+ * Append CREATE TABLE clause. If cluster name && on-cluster is specified in
properties, append ON
+ * CLUSTER clause.
+ *
+ * @param properties Table properties
+ * @param sqlBuilder SQL builder
+ * @return true if ON CLUSTER clause is appended, false otherwise
+ */
+ private boolean appendCreateTableClause(
+ Map<String, String> properties, StringBuilder sqlBuilder, String
tableName) {
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String onClusterValue = properties.get(ClusterConstants.ON_CLUSTER);
+
+ boolean onCluster =
+ StringUtils.isNotBlank(clusterName)
+ && StringUtils.isNotBlank(onClusterValue)
+ && Boolean.TRUE.equals(Boolean.parseBoolean(onClusterValue));
+
+ if (onCluster) {
+ sqlBuilder.append(
+ "CREATE TABLE %s ON CLUSTER `%s`
(\n".formatted(quoteIdentifier(tableName), clusterName));
Review Comment:
`clusterName` is interpolated directly into a backticked identifier in the
`ON CLUSTER` clause. If it contains a backtick (or other unexpected
characters), the generated SQL becomes invalid and can open SQL-injection risk.
Please validate/escape the cluster name (or use a shared identifier-quoting
helper) before appending it to SQL.
```suggestion
"CREATE TABLE %s ON CLUSTER %s (\n"
.formatted(quoteIdentifier(tableName),
quoteIdentifier(clusterName)));
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = normalizePartitionKey(partitionKey);
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String expression = StringUtils.trim(part);
+ if (StringUtils.isBlank(expression)) {
+ continue;
+ }
+ transforms.add(parsePartitionExpression(expression, partitionKey));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ private Transform parsePartitionExpression(String expression, String
originalPartitionKey) {
+ String trimmedExpression = StringUtils.trim(expression);
+
+ Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+ if (toYearMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYearMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.year(identifier);
+ }
+
+ Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+ if (toYYYYMMMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.month(identifier);
+ }
+
+ Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+ if (toDateMatcher.matches()) {
+ String identifier = normalizeIdentifier(toDateMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.day(identifier);
+ }
+
+ // for other functions, we do not support it now
+ if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+ throw new UnsupportedOperationException(
+ "Currently Gravitino only supports toYYYY, toYYYYMM, toDate
partition expressions, but got: "
+ + trimmedExpression);
+ }
+
+ String identifier = normalizeIdentifier(trimmedExpression);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier) &&
!StringUtils.containsAny(identifier, "(", ")", " "),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.identity(identifier);
+ }
+
+ private String normalizePartitionKey(String partitionKey) {
+ String trimmedKey = partitionKey.trim();
+ if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+ return "";
+ }
+ if (StringUtils.startsWithIgnoreCase(trimmedKey, "tuple(")
+ && StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring("tuple(".length(), trimmedKey.length() -
1).trim();
+ }
+ if (StringUtils.startsWith(trimmedKey, "(") &&
StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring(1, trimmedKey.length() - 1).trim();
+ }
+ return trimmedKey;
+ }
+
+ private String normalizeIdentifier(String identifier) {
+ String col = StringUtils.trim(identifier);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`") &&
col.length() >= 2) {
+ return col.substring(1, col.length() - 1);
+ }
+ return col;
+ }
+
+ @VisibleForTesting
+ String[][] parseIndexFields(String expression) {
+ if (StringUtils.isBlank(expression)) {
+ return new String[0][];
+ }
+
+ String normalized = normalizeIndexExpression(expression);
+ if (StringUtils.isBlank(normalized)) {
+ return new String[0][];
+ }
+
+ String[] parts = normalized.split(",");
+ List<String[]> fields = new ArrayList<>();
+ for (String part : parts) {
+ String col = normalizeIdentifier(part);
+ Preconditions.checkArgument(
+ isSimpleIdentifier(col), "Unsupported index expression: " +
expression);
+ fields.add(new String[] {col});
+ }
+
+ return fields.toArray(new String[0][]);
+ }
+
+ private String normalizeIndexExpression(String expression) {
+ String trimmed = expression.trim();
+
+ boolean stripped = true;
+ while (stripped) {
+ stripped = false;
+ Matcher matcher = FUNCTION_WRAPPER_PATTERN.matcher(trimmed);
+ if (matcher.matches()) {
+ trimmed = matcher.group(2).trim();
+ stripped = true;
+ }
+ }
+
+ if (StringUtils.startsWithIgnoreCase(trimmed, "tuple(") &&
StringUtils.endsWith(trimmed, ")")) {
+ trimmed = trimmed.substring("tuple(".length(), trimmed.length() -
1).trim();
+ } else if (StringUtils.equalsIgnoreCase(trimmed, "tuple()")) {
+ trimmed = "";
+ }
+
+ return trimmed;
+ }
+
+ private boolean isSimpleIdentifier(String identifier) {
+ return StringUtils.isNotBlank(identifier)
+ && !StringUtils.containsAny(identifier, "(", ")", " ", "%", "+", "-",
"*", "/");
+ }
+
+ private List<Index> getSecondaryIndexes(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ List<Index> secondaryIndexes = new ArrayList<>();
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(
+ "SELECT name, type, expr FROM system.data_skipping_indices "
+ + "WHERE database = ? AND table = ? ORDER BY name")) {
+ preparedStatement.setString(1, databaseName);
+ preparedStatement.setString(2, tableName);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ String type = resultSet.getString("type");
+ String expression = resultSet.getString("expr");
+ try {
+ String[][] fields = parseIndexFields(expression);
+ if (ArrayUtils.isEmpty(fields)) {
+ continue;
+ }
+ secondaryIndexes.add(Indexes.of(getClickHouseIndexType(type),
name, fields));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported data skipping index {} for {}.{} with
expression {}",
+ name,
+ databaseName,
+ tableName,
+ expression);
+ }
+ }
+ }
+ }
+
+ return secondaryIndexes;
+ }
+
+ private Index.IndexType getClickHouseIndexType(String rawType) {
+ if (StringUtils.isBlank(rawType)) {
+ return Index.IndexType.DATA_SKIPPING_MINMAX;
+ }
+
+ switch (rawType) {
+ case "minmax":
+ return Index.IndexType.DATA_SKIPPING_MINMAX;
+ case "bloom_filter":
+ return Index.IndexType.DATA_SKIPPING_BLOOM_FILTER;
Review Comment:
`IndexType` now includes `DATA_SKIPPING_NGRAMBF_V1` and
`DATA_SKIPPING_TOKENBF_V1`, but `getClickHouseIndexType()` (and the SQL
generator switch) only recognizes `minmax` and `bloom_filter`. This will cause
these index types to be treated as unsupported/skipped even though the API
exposes them. Please add mappings and SQL generation for the new types (or
remove the enum values until they’re supported end-to-end).
```suggestion
return Index.IndexType.DATA_SKIPPING_BLOOM_FILTER;
case "ngrambf_v1":
return Index.IndexType.DATA_SKIPPING_NGRAMBF_V1;
case "tokenbf_v1":
return Index.IndexType.DATA_SKIPPING_TOKENBF_V1;
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = normalizePartitionKey(partitionKey);
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String expression = StringUtils.trim(part);
+ if (StringUtils.isBlank(expression)) {
+ continue;
+ }
+ transforms.add(parsePartitionExpression(expression, partitionKey));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ private Transform parsePartitionExpression(String expression, String
originalPartitionKey) {
+ String trimmedExpression = StringUtils.trim(expression);
+
+ Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+ if (toYearMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYearMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.year(identifier);
+ }
+
+ Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+ if (toYYYYMMMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.month(identifier);
+ }
+
+ Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+ if (toDateMatcher.matches()) {
+ String identifier = normalizeIdentifier(toDateMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.day(identifier);
+ }
+
+ // for other functions, we do not support it now
+ if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+ throw new UnsupportedOperationException(
+ "Currently Gravitino only supports toYYYY, toYYYYMM, toDate
partition expressions, but got: "
Review Comment:
The UnsupportedOperationException message says it supports `toYYYY`, but the
implementation actually supports `toYear` (and does not match a `toYYYY(...)`
function). Please update the message to list the actual supported functions so
users can correct their partition expressions.
```suggestion
"Currently Gravitino only supports toYear, toYYYYMM, toDate
partition expressions, but got: "
```
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -85,11 +87,29 @@ protected String generateCreateDatabaseSql(
StringBuilder createDatabaseSql =
new StringBuilder(String.format("CREATE DATABASE `%s`", databaseName));
+ if (onCluster(properties)) {
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ createDatabaseSql.append(String.format(" ON CLUSTER `%s`", clusterName));
+ }
Review Comment:
`clusterName` is appended into `ON CLUSTER` using backticks but without
escaping/quoting via a helper. A cluster name containing a backtick will break
the generated SQL (and user-controlled properties can become an injection
vector). Please escape/quote the cluster identifier consistently before
building the SQL string.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -333,6 +472,34 @@ protected Map<String, String>
getTableProperties(Connection connection, String t
}
}
+ @Override
+ protected Transform[] getTablePartitioning(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ try (PreparedStatement statement =
+ connection.prepareStatement(
+ "SELECT partition_key FROM system.tables WHERE database = ? AND
name = ?")) {
+ statement.setString(1, databaseName);
+ statement.setString(2, tableName);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ String partitionKey = resultSet.getString("partition_key");
+ try {
+ return parsePartitioning(partitionKey);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported partition expression {} for {}.{}",
+ partitionKey,
+ databaseName,
+ tableName);
+ return Transforms.EMPTY_TRANSFORM;
+ }
Review Comment:
`getTablePartitioning()` intends to skip unsupported partition expressions,
but it only catches `IllegalArgumentException`. `parsePartitioning()` can throw
`UnsupportedOperationException` for unsupported functions, which will currently
bubble up and can fail table load. Catch `UnsupportedOperationException` as
well (or catch `RuntimeException`) to ensure unsupported partition keys are
safely ignored as the log message suggests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]