Copilot commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2780968853


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
   }
 
   private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
-      Map<String, String> properties, StringBuilder sqlBuilder) {
+      Map<String, String> properties, StringBuilder sqlBuilder, boolean 
onCluster) {
     ClickHouseTablePropertiesMetadata.ENGINE engine = 
ENGINE_PROPERTY_ENTRY.getDefaultValue();
     if (MapUtils.isNotEmpty(properties)) {
-      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
       if (StringUtils.isNotEmpty(userSetEngine)) {
         engine = 
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
       }
     }
+
+    if (engine == ENGINE.DISTRIBUTED) {
+      if (!onCluster) {
+        throw new IllegalArgumentException(
+            "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be 
specified.");
+      }
+
+      // Check properties
+      String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+      String remoteDatabase = 
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+      String remoteTable = 
properties.get(DistributedTableConstants.REMOTE_TABLE);
+      String shardingKey = 
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(clusterName),
+          "Cluster name must be specified when engine is Distributed");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(remoteDatabase),
+          "Remote database must be specified for Distributed");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(remoteTable), "Remote table must be specified 
for Distributed");
+
+      // User must ensure the sharding key is a trusted value
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(shardingKey), "Sharding key must be specified 
for Distributed");
+
+      sqlBuilder.append(
+          "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+              .formatted(
+                  ENGINE.DISTRIBUTED.getValue(),
+                  clusterName,
+                  remoteDatabase,
+                  remoteTable,
+                  shardingKey));
+      return engine;
+    }
+
     sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
     return engine;
   }
 
+  private void appendPartitionClause(
+      Transform[] partitioning,
+      StringBuilder sqlBuilder,
+      ClickHouseTablePropertiesMetadata.ENGINE engine) {
+    if (ArrayUtils.isEmpty(partitioning)) {
+      return;
+    }
+
+    if (!engine.acceptPartition()) {
+      throw new UnsupportedOperationException(
+          "Partitioning is only supported for MergeTree family engines");
+    }
+
+    List<String> partitionExprs =
+        
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+    String partitionExpr =
+        partitionExprs.size() == 1
+            ? partitionExprs.get(0)
+            : "tuple(" + String.join(", ", partitionExprs) + ")";
+    sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+  }
+
+  private String toPartitionExpression(Transform transform) {
+    Preconditions.checkArgument(transform != null, "Partition transform cannot 
be null");
+    Preconditions.checkArgument(
+        StringUtils.equalsIgnoreCase(transform.name(), 
Transforms.NAME_OF_IDENTITY),
+        "Unsupported partition transform: " + transform.name());
+    Preconditions.checkArgument(
+        transform.arguments().length == 1
+            && transform.arguments()[0] instanceof NamedReference
+            && ((NamedReference) transform.arguments()[0]).fieldName().length 
== 1,
+        "ClickHouse only supports single column identity partitioning");
+
+    String fieldName =
+        ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already 
validated
+    return quoteIdentifier(fieldName);

Review Comment:
   `toPartitionExpression()` only supports `identity` transforms, but 
`parsePartitioning()` produces `day/month/year` transforms for 
`toDate/toYYYYMM/toYear`. This makes partitioning non-roundtrippable and 
prevents creating tables with these supported ClickHouse partition expressions. 
Please add handling to convert `day/month/year` transforms back into the 
corresponding ClickHouse functions when generating `PARTITION BY`.
   ```suggestion
   
       String transformName = transform.name();
       Preconditions.checkArgument(
           StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_IDENTITY)
               || StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_DAY)
               || StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_MONTH)
               || StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_YEAR),
           "Unsupported partition transform: " + transformName);
   
       Preconditions.checkArgument(
           transform.arguments().length == 1
               && transform.arguments()[0] instanceof NamedReference
               && ((NamedReference) 
transform.arguments()[0]).fieldName().length == 1,
           "ClickHouse only supports single column partitioning");
   
       String fieldName =
           ((NamedReference) transform.arguments()[0]).fieldName()[0]; // 
already validated
       String quotedFieldName = quoteIdentifier(fieldName);
   
       if (StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_IDENTITY)) {
         return quotedFieldName;
       } else if (StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_DAY)) {
         // Map day transform back to ClickHouse toDate() for partitioning
         return "toDate(" + quotedFieldName + ")";
       } else if (StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_MONTH)) {
         // Map month transform back to ClickHouse toYYYYMM() for partitioning
         return "toYYYYMM(" + quotedFieldName + ")";
       } else if (StringUtils.equalsIgnoreCase(transformName, 
Transforms.NAME_OF_YEAR)) {
         // Map year transform back to ClickHouse toYear() for partitioning
         return "toYear(" + quotedFieldName + ")";
       }
   
       // This should be unreachable due to the earlier precondition, but keep 
it defensive.
       throw new IllegalArgumentException("Unsupported partition transform: " + 
transformName);
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
     return appendColumnDefinition(newColumn, sqlBuilder).toString();
   }
 
+  @VisibleForTesting
+  Transform[] parsePartitioning(String partitionKey) {
+    if (StringUtils.isBlank(partitionKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String trimmedKey = normalizePartitionKey(partitionKey);
+    if (StringUtils.isBlank(trimmedKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String[] parts = trimmedKey.split(",");
+    List<Transform> transforms = new ArrayList<>();
+    for (String part : parts) {
+      String expression = StringUtils.trim(part);
+      if (StringUtils.isBlank(expression)) {
+        continue;
+      }
+      transforms.add(parsePartitionExpression(expression, partitionKey));
+    }
+
+    return transforms.toArray(new Transform[0]);
+  }
+
+  private Transform parsePartitionExpression(String expression, String 
originalPartitionKey) {
+    String trimmedExpression = StringUtils.trim(expression);
+
+    Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+    if (toYearMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYearMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.year(identifier);
+    }
+
+    Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+    if (toYYYYMMMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.month(identifier);
+    }
+
+    Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+    if (toDateMatcher.matches()) {
+      String identifier = normalizeIdentifier(toDateMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.day(identifier);
+    }

Review Comment:
   For `toYear(...)` / `toYYYYMM(...)` / `toDate(...)`, the captured inner text 
is only checked for non-blank. This will accept complex expressions like 
`toYear(ts + 1)` and turn them into a transform with an invalid "column" name, 
which can break downstream consumers. Please validate that the extracted 
identifier is a simple column reference (e.g., reuse `isSimpleIdentifier` and 
trim/backtick-normalize) and reject anything else as unsupported.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
   }
 
   private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
-      Map<String, String> properties, StringBuilder sqlBuilder) {
+      Map<String, String> properties, StringBuilder sqlBuilder, boolean 
onCluster) {
     ClickHouseTablePropertiesMetadata.ENGINE engine = 
ENGINE_PROPERTY_ENTRY.getDefaultValue();
     if (MapUtils.isNotEmpty(properties)) {
-      String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+      String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
       if (StringUtils.isNotEmpty(userSetEngine)) {
         engine = 
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
       }
     }
+
+    if (engine == ENGINE.DISTRIBUTED) {
+      if (!onCluster) {
+        throw new IllegalArgumentException(
+            "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be 
specified.");
+      }
+
+      // Check properties
+      String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+      String remoteDatabase = 
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+      String remoteTable = 
properties.get(DistributedTableConstants.REMOTE_TABLE);
+      String shardingKey = 
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(clusterName),
+          "Cluster name must be specified when engine is Distributed");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(remoteDatabase),
+          "Remote database must be specified for Distributed");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(remoteTable), "Remote table must be specified 
for Distributed");
+
+      // User must ensure the sharding key is a trusted value
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(shardingKey), "Sharding key must be specified 
for Distributed");
+
+      sqlBuilder.append(
+          "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+              .formatted(
+                  ENGINE.DISTRIBUTED.getValue(),
+                  clusterName,
+                  remoteDatabase,
+                  remoteTable,
+                  shardingKey));

Review Comment:
   The Distributed engine clause formats `clusterName`, `remoteDatabase`, and 
`remoteTable` inside backticks without escaping, and inserts `shardingKey` 
unquoted as raw SQL. This can produce invalid SQL for names containing 
backticks and is also a SQL-injection vector via properties. Please apply 
proper identifier quoting/escaping for the name fields, and restrict/validate 
the sharding key (e.g., only allow simple identifiers or a safe expression 
whitelist).



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -170,6 +187,34 @@ protected String generateCreateTableSql(
     return result;
   }
 
+  /**
+   * Append CREATE TABLE clause. If cluster name && on-cluster is specified in 
properties, append ON
+   * CLUSTER clause.
+   *
+   * @param properties Table properties
+   * @param sqlBuilder SQL builder
+   * @return true if ON CLUSTER clause is appended, false otherwise
+   */
+  private boolean appendCreateTableClause(
+      Map<String, String> properties, StringBuilder sqlBuilder, String 
tableName) {
+    String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+    String onClusterValue = properties.get(ClusterConstants.ON_CLUSTER);
+
+    boolean onCluster =
+        StringUtils.isNotBlank(clusterName)
+            && StringUtils.isNotBlank(onClusterValue)
+            && Boolean.TRUE.equals(Boolean.parseBoolean(onClusterValue));
+
+    if (onCluster) {
+      sqlBuilder.append(
+          "CREATE TABLE %s ON CLUSTER `%s` 
(\n".formatted(quoteIdentifier(tableName), clusterName));

Review Comment:
   `clusterName` is interpolated directly into a backticked identifier in the 
`ON CLUSTER` clause. If it contains a backtick (or other unexpected 
characters), the generated SQL becomes invalid and can open SQL-injection risk. 
Please validate/escape the cluster name (or use a shared identifier-quoting 
helper) before appending it to SQL.
   ```suggestion
             "CREATE TABLE %s ON CLUSTER %s (\n"
                 .formatted(quoteIdentifier(tableName), 
quoteIdentifier(clusterName)));
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
     return appendColumnDefinition(newColumn, sqlBuilder).toString();
   }
 
+  @VisibleForTesting
+  Transform[] parsePartitioning(String partitionKey) {
+    if (StringUtils.isBlank(partitionKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String trimmedKey = normalizePartitionKey(partitionKey);
+    if (StringUtils.isBlank(trimmedKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String[] parts = trimmedKey.split(",");
+    List<Transform> transforms = new ArrayList<>();
+    for (String part : parts) {
+      String expression = StringUtils.trim(part);
+      if (StringUtils.isBlank(expression)) {
+        continue;
+      }
+      transforms.add(parsePartitionExpression(expression, partitionKey));
+    }
+
+    return transforms.toArray(new Transform[0]);
+  }
+
+  private Transform parsePartitionExpression(String expression, String 
originalPartitionKey) {
+    String trimmedExpression = StringUtils.trim(expression);
+
+    Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+    if (toYearMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYearMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.year(identifier);
+    }
+
+    Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+    if (toYYYYMMMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.month(identifier);
+    }
+
+    Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+    if (toDateMatcher.matches()) {
+      String identifier = normalizeIdentifier(toDateMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.day(identifier);
+    }
+
+    // for other functions, we do not support it now
+    if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+      throw new UnsupportedOperationException(
+          "Currently Gravitino only supports toYYYY, toYYYYMM, toDate 
partition expressions, but got: "
+              + trimmedExpression);
+    }
+
+    String identifier = normalizeIdentifier(trimmedExpression);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(identifier) && 
!StringUtils.containsAny(identifier, "(", ")", " "),
+        "Unsupported partition expression: " + originalPartitionKey);
+    return Transforms.identity(identifier);
+  }
+
+  private String normalizePartitionKey(String partitionKey) {
+    String trimmedKey = partitionKey.trim();
+    if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+      return "";
+    }
+    if (StringUtils.startsWithIgnoreCase(trimmedKey, "tuple(")
+        && StringUtils.endsWith(trimmedKey, ")")) {
+      return trimmedKey.substring("tuple(".length(), trimmedKey.length() - 
1).trim();
+    }
+    if (StringUtils.startsWith(trimmedKey, "(") && 
StringUtils.endsWith(trimmedKey, ")")) {
+      return trimmedKey.substring(1, trimmedKey.length() - 1).trim();
+    }
+    return trimmedKey;
+  }
+
+  private String normalizeIdentifier(String identifier) {
+    String col = StringUtils.trim(identifier);
+    if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`") && 
col.length() >= 2) {
+      return col.substring(1, col.length() - 1);
+    }
+    return col;
+  }
+
+  @VisibleForTesting
+  String[][] parseIndexFields(String expression) {
+    if (StringUtils.isBlank(expression)) {
+      return new String[0][];
+    }
+
+    String normalized = normalizeIndexExpression(expression);
+    if (StringUtils.isBlank(normalized)) {
+      return new String[0][];
+    }
+
+    String[] parts = normalized.split(",");
+    List<String[]> fields = new ArrayList<>();
+    for (String part : parts) {
+      String col = normalizeIdentifier(part);
+      Preconditions.checkArgument(
+          isSimpleIdentifier(col), "Unsupported index expression: " + 
expression);
+      fields.add(new String[] {col});
+    }
+
+    return fields.toArray(new String[0][]);
+  }
+
+  private String normalizeIndexExpression(String expression) {
+    String trimmed = expression.trim();
+
+    boolean stripped = true;
+    while (stripped) {
+      stripped = false;
+      Matcher matcher = FUNCTION_WRAPPER_PATTERN.matcher(trimmed);
+      if (matcher.matches()) {
+        trimmed = matcher.group(2).trim();
+        stripped = true;
+      }
+    }
+
+    if (StringUtils.startsWithIgnoreCase(trimmed, "tuple(") && 
StringUtils.endsWith(trimmed, ")")) {
+      trimmed = trimmed.substring("tuple(".length(), trimmed.length() - 
1).trim();
+    } else if (StringUtils.equalsIgnoreCase(trimmed, "tuple()")) {
+      trimmed = "";
+    }
+
+    return trimmed;
+  }
+
+  private boolean isSimpleIdentifier(String identifier) {
+    return StringUtils.isNotBlank(identifier)
+        && !StringUtils.containsAny(identifier, "(", ")", " ", "%", "+", "-", 
"*", "/");
+  }
+
+  private List<Index> getSecondaryIndexes(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+    List<Index> secondaryIndexes = new ArrayList<>();
+    try (PreparedStatement preparedStatement =
+        connection.prepareStatement(
+            "SELECT name, type, expr FROM system.data_skipping_indices "
+                + "WHERE database = ? AND table = ? ORDER BY name")) {
+      preparedStatement.setString(1, databaseName);
+      preparedStatement.setString(2, tableName);
+      try (ResultSet resultSet = preparedStatement.executeQuery()) {
+        while (resultSet.next()) {
+          String name = resultSet.getString("name");
+          String type = resultSet.getString("type");
+          String expression = resultSet.getString("expr");
+          try {
+            String[][] fields = parseIndexFields(expression);
+            if (ArrayUtils.isEmpty(fields)) {
+              continue;
+            }
+            secondaryIndexes.add(Indexes.of(getClickHouseIndexType(type), 
name, fields));
+          } catch (IllegalArgumentException e) {
+            LOG.warn(
+                "Skip unsupported data skipping index {} for {}.{} with 
expression {}",
+                name,
+                databaseName,
+                tableName,
+                expression);
+          }
+        }
+      }
+    }
+
+    return secondaryIndexes;
+  }
+
+  private Index.IndexType getClickHouseIndexType(String rawType) {
+    if (StringUtils.isBlank(rawType)) {
+      return Index.IndexType.DATA_SKIPPING_MINMAX;
+    }
+
+    switch (rawType) {
+      case "minmax":
+        return Index.IndexType.DATA_SKIPPING_MINMAX;
+      case "bloom_filter":
+        return Index.IndexType.DATA_SKIPPING_BLOOM_FILTER;

Review Comment:
   `IndexType` now includes `DATA_SKIPPING_NGRAMBF_V1` and 
`DATA_SKIPPING_TOKENBF_V1`, but `getClickHouseIndexType()` (and the SQL 
generator switch) only recognizes `minmax` and `bloom_filter`. This will cause 
these index types to be treated as unsupported/skipped even though the API 
exposes them. Please add mappings and SQL generation for the new types (or 
remove the enum values until they’re supported end-to-end).
   ```suggestion
           return Index.IndexType.DATA_SKIPPING_BLOOM_FILTER;
         case "ngrambf_v1":
           return Index.IndexType.DATA_SKIPPING_NGRAMBF_V1;
         case "tokenbf_v1":
           return Index.IndexType.DATA_SKIPPING_TOKENBF_V1;
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
     return appendColumnDefinition(newColumn, sqlBuilder).toString();
   }
 
+  @VisibleForTesting
+  Transform[] parsePartitioning(String partitionKey) {
+    if (StringUtils.isBlank(partitionKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String trimmedKey = normalizePartitionKey(partitionKey);
+    if (StringUtils.isBlank(trimmedKey)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    String[] parts = trimmedKey.split(",");
+    List<Transform> transforms = new ArrayList<>();
+    for (String part : parts) {
+      String expression = StringUtils.trim(part);
+      if (StringUtils.isBlank(expression)) {
+        continue;
+      }
+      transforms.add(parsePartitionExpression(expression, partitionKey));
+    }
+
+    return transforms.toArray(new Transform[0]);
+  }
+
+  private Transform parsePartitionExpression(String expression, String 
originalPartitionKey) {
+    String trimmedExpression = StringUtils.trim(expression);
+
+    Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+    if (toYearMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYearMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.year(identifier);
+    }
+
+    Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+    if (toYYYYMMMatcher.matches()) {
+      String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.month(identifier);
+    }
+
+    Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+    if (toDateMatcher.matches()) {
+      String identifier = normalizeIdentifier(toDateMatcher.group(1));
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(identifier),
+          "Unsupported partition expression: " + originalPartitionKey);
+      return Transforms.day(identifier);
+    }
+
+    // for other functions, we do not support it now
+    if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+      throw new UnsupportedOperationException(
+          "Currently Gravitino only supports toYYYY, toYYYYMM, toDate 
partition expressions, but got: "

Review Comment:
   The UnsupportedOperationException message says it supports `toYYYY`, but the 
implementation actually supports `toYear` (and does not match a `toYYYY(...)` 
function). Please update the message to list the actual supported functions so 
users can correct their partition expressions.
   ```suggestion
             "Currently Gravitino only supports toYear, toYYYYMM, toDate 
partition expressions, but got: "
   ```



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -85,11 +87,29 @@ protected String generateCreateDatabaseSql(
     StringBuilder createDatabaseSql =
         new StringBuilder(String.format("CREATE DATABASE `%s`", databaseName));
 
+    if (onCluster(properties)) {
+      String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+      createDatabaseSql.append(String.format(" ON CLUSTER `%s`", clusterName));
+    }

Review Comment:
   `clusterName` is appended into `ON CLUSTER` using backticks but without 
escaping/quoting via a helper. A cluster name containing a backtick will break 
the generated SQL (and user-controlled properties can become an injection 
vector). Please escape/quote the cluster identifier consistently before 
building the SQL string.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -333,6 +472,34 @@ protected Map<String, String> 
getTableProperties(Connection connection, String t
     }
   }
 
+  @Override
+  protected Transform[] getTablePartitioning(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+    try (PreparedStatement statement =
+        connection.prepareStatement(
+            "SELECT partition_key FROM system.tables WHERE database = ? AND 
name = ?")) {
+      statement.setString(1, databaseName);
+      statement.setString(2, tableName);
+      try (ResultSet resultSet = statement.executeQuery()) {
+        if (resultSet.next()) {
+          String partitionKey = resultSet.getString("partition_key");
+          try {
+            return parsePartitioning(partitionKey);
+          } catch (IllegalArgumentException e) {
+            LOG.warn(
+                "Skip unsupported partition expression {} for {}.{}",
+                partitionKey,
+                databaseName,
+                tableName);
+            return Transforms.EMPTY_TRANSFORM;
+          }

Review Comment:
   `getTablePartitioning()` intends to skip unsupported partition expressions, 
but it only catches `IllegalArgumentException`. `parsePartitioning()` can throw 
`UnsupportedOperationException` for unsupported functions, which will currently 
bubble up and can fail table load. Catch `UnsupportedOperationException` as 
well (or catch `RuntimeException`) to ensure unsupported partition keys are 
safely ignored as the log message suggests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to