diqiu50 commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2781021581
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
Review Comment:
We don't need add that limitation when create a DISTRIBUTED table
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
+ if (StringUtils.isBlank(partitionKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String trimmedKey = normalizePartitionKey(partitionKey);
+ if (StringUtils.isBlank(trimmedKey)) {
+ return Transforms.EMPTY_TRANSFORM;
+ }
+
+ String[] parts = trimmedKey.split(",");
+ List<Transform> transforms = new ArrayList<>();
+ for (String part : parts) {
+ String expression = StringUtils.trim(part);
+ if (StringUtils.isBlank(expression)) {
+ continue;
+ }
+ transforms.add(parsePartitionExpression(expression, partitionKey));
+ }
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ private Transform parsePartitionExpression(String expression, String
originalPartitionKey) {
+ String trimmedExpression = StringUtils.trim(expression);
+
+ Matcher toYearMatcher = TO_YEAR_PATTERN.matcher(trimmedExpression);
+ if (toYearMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYearMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.year(identifier);
+ }
+
+ Matcher toYYYYMMMatcher = TO_MONTH_PATTERN.matcher(trimmedExpression);
+ if (toYYYYMMMatcher.matches()) {
+ String identifier = normalizeIdentifier(toYYYYMMMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.month(identifier);
+ }
+
+ Matcher toDateMatcher = TO_DATE_PATTERN.matcher(trimmedExpression);
+ if (toDateMatcher.matches()) {
+ String identifier = normalizeIdentifier(toDateMatcher.group(1));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.day(identifier);
+ }
+
+ // for other functions, we do not support it now
+ if (trimmedExpression.contains("(") && trimmedExpression.contains(")")) {
+ throw new UnsupportedOperationException(
+ "Currently Gravitino only supports toYYYY, toYYYYMM, toDate
partition expressions, but got: "
+ + trimmedExpression);
+ }
+
+ String identifier = normalizeIdentifier(trimmedExpression);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(identifier) &&
!StringUtils.containsAny(identifier, "(", ")", " "),
+ "Unsupported partition expression: " + originalPartitionKey);
+ return Transforms.identity(identifier);
+ }
+
+ private String normalizePartitionKey(String partitionKey) {
+ String trimmedKey = partitionKey.trim();
+ if (StringUtils.equalsIgnoreCase(trimmedKey, "tuple()")) {
+ return "";
+ }
+ if (StringUtils.startsWithIgnoreCase(trimmedKey, "tuple(")
+ && StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring("tuple(".length(), trimmedKey.length() -
1).trim();
+ }
+ if (StringUtils.startsWith(trimmedKey, "(") &&
StringUtils.endsWith(trimmedKey, ")")) {
+ return trimmedKey.substring(1, trimmedKey.length() - 1).trim();
+ }
+ return trimmedKey;
+ }
+
+ private String normalizeIdentifier(String identifier) {
+ String col = StringUtils.trim(identifier);
+ if (StringUtils.startsWith(col, "`") && StringUtils.endsWith(col, "`") &&
col.length() >= 2) {
+ return col.substring(1, col.length() - 1);
+ }
+ return col;
+ }
+
+ @VisibleForTesting
+ String[][] parseIndexFields(String expression) {
+ if (StringUtils.isBlank(expression)) {
+ return new String[0][];
+ }
+
+ String normalized = normalizeIndexExpression(expression);
+ if (StringUtils.isBlank(normalized)) {
+ return new String[0][];
+ }
+
+ String[] parts = normalized.split(",");
+ List<String[]> fields = new ArrayList<>();
+ for (String part : parts) {
+ String col = normalizeIdentifier(part);
+ Preconditions.checkArgument(
+ isSimpleIdentifier(col), "Unsupported index expression: " +
expression);
+ fields.add(new String[] {col});
+ }
+
+ return fields.toArray(new String[0][]);
+ }
+
+ private String normalizeIndexExpression(String expression) {
+ String trimmed = expression.trim();
+
+ boolean stripped = true;
+ while (stripped) {
+ stripped = false;
+ Matcher matcher = FUNCTION_WRAPPER_PATTERN.matcher(trimmed);
+ if (matcher.matches()) {
+ trimmed = matcher.group(2).trim();
+ stripped = true;
+ }
+ }
+
+ if (StringUtils.startsWithIgnoreCase(trimmed, "tuple(") &&
StringUtils.endsWith(trimmed, ")")) {
+ trimmed = trimmed.substring("tuple(".length(), trimmed.length() -
1).trim();
+ } else if (StringUtils.equalsIgnoreCase(trimmed, "tuple()")) {
+ trimmed = "";
+ }
+
+ return trimmed;
+ }
+
+ private boolean isSimpleIdentifier(String identifier) {
+ return StringUtils.isNotBlank(identifier)
+ && !StringUtils.containsAny(identifier, "(", ")", " ", "%", "+", "-",
"*", "/");
+ }
+
+ private List<Index> getSecondaryIndexes(
+ Connection connection, String databaseName, String tableName) throws
SQLException {
+ List<Index> secondaryIndexes = new ArrayList<>();
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(
+ "SELECT name, type, expr FROM system.data_skipping_indices "
+ + "WHERE database = ? AND table = ? ORDER BY name")) {
+ preparedStatement.setString(1, databaseName);
+ preparedStatement.setString(2, tableName);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ String type = resultSet.getString("type");
+ String expression = resultSet.getString("expr");
+ try {
+ String[][] fields = parseIndexFields(expression);
+ if (ArrayUtils.isEmpty(fields)) {
+ continue;
+ }
+ secondaryIndexes.add(Indexes.of(getClickHouseIndexType(type),
name, fields));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Skip unsupported data skipping index {} for {}.{} with
expression {}",
+ name,
+ databaseName,
+ tableName,
+ expression);
+ }
+ }
+ }
+ }
+
+ return secondaryIndexes;
+ }
+
+ private Index.IndexType getClickHouseIndexType(String rawType) {
+ if (StringUtils.isBlank(rawType)) {
+ return Index.IndexType.DATA_SKIPPING_MINMAX;
+ }
+
+ switch (rawType) {
+ case "minmax":
Review Comment:
make it as constants
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
return engine;
}
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
Review Comment:
Is this a temporary limitation
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -683,6 +850,197 @@ private String updateColumnTypeFieldDefinition(
return appendColumnDefinition(newColumn, sqlBuilder).toString();
}
+ @VisibleForTesting
+ Transform[] parsePartitioning(String partitionKey) {
Review Comment:
Do you consider moving the partition and Index logic into separate classes
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
Why don’t we use the expression with shardingKey?
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
Review Comment:
Does it need to remove?
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
`shardingKey` may be a function , so it should not be escaped
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
Review Comment:
We'd better extract a method or class by engine to handle the logic
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
return engine;
}
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
+
+ String fieldName =
+ ((NamedReference) transform.arguments()[0]).fieldName()[0]; // already
validated
Review Comment:
Do we only support partitioning by filed names
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]