yuqi1129 commented on code in PR #9753:
URL: https://github.com/apache/gravitino/pull/9753#discussion_r2734683249
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,67 @@ private static void validate(List<String> primaryKeys,
List<String> partitionKey
}
}
+ private static void applyDistribution(Map<String, String> properties,
Distribution distribution) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ List<String> bucketKeys = getBucketKeys(distribution);
+ if (!bucketKeys.isEmpty()) {
+ properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+ }
+
+ properties.put(BUCKET_NUM, String.valueOf(distribution.number()));
Review Comment:
What if the bucket keys are empty, and then you set `bucket_num` here, does
it take effect finally?
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java:
##########
@@ -501,6 +502,74 @@ private void checkPaimonIndexes(Index[] indexes) {
"Paimon only supports primary key Index."));
}
+ private void validateDistribution(Distribution distribution, Column[]
columns, Index[] indexes) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ Preconditions.checkArgument(
+ distribution.strategy() == Strategy.HASH,
+ "Paimon only supports HASH distribution strategy.");
+
+ Preconditions.checkArgument(
+ distribution.expressions() != null &&
distribution.expressions().length > 0,
+ "Paimon bucket keys must be specified for HASH distribution.");
+
+ int bucketNumber = distribution.number();
+ Preconditions.checkArgument(
+ bucketNumber == Distributions.AUTO || bucketNumber > 0,
+ "Paimon bucket number must be positive or AUTO.");
+
+ List<String> bucketKeys = extractBucketKeys(distribution);
+ List<String> columnNames =
+ Arrays.stream(columns).map(Column::name).collect(Collectors.toList());
+ bucketKeys.forEach(
+ bucketKey ->
+ Preconditions.checkArgument(
+ columnNames.stream().anyMatch(name -> name.equals(bucketKey)),
+ "Distribution column %s does not exist in table columns.",
+ bucketKey));
+
+ List<String> primaryKeys = extractPrimaryKeys(indexes);
+ if (!primaryKeys.isEmpty()) {
+ Preconditions.checkArgument(
+ primaryKeys.containsAll(bucketKeys),
+ "Paimon bucket keys must be a subset of primary key columns for
primary key tables.");
+ }
+ }
+
+ private static List<String> extractBucketKeys(Distribution distribution) {
+ return Arrays.stream(distribution.expressions())
+ .map(
+ expression -> {
+ Preconditions.checkArgument(
+ expression instanceof NamedReference.FieldReference,
+ "Paimon bucket keys must be plain column references.");
+ NamedReference.FieldReference reference =
(NamedReference.FieldReference) expression;
+ String[] fieldNames = reference.fieldName();
+ Preconditions.checkArgument(
+ fieldNames.length == 1, "Paimon bucket keys must be single
columns.");
+ return fieldNames[0];
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<String> extractPrimaryKeys(Index[] indexes) {
+ if (indexes == null || indexes.length == 0) {
+ return Collections.emptyList();
+ }
+ Index primaryKeyIndex = indexes[0];
Review Comment:
What if the length of `indexes` is greater than 1?
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,67 @@ private static void validate(List<String> primaryKeys,
List<String> partitionKey
}
}
+ private static void applyDistribution(Map<String, String> properties,
Distribution distribution) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ List<String> bucketKeys = getBucketKeys(distribution);
+ if (!bucketKeys.isEmpty()) {
+ properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+ }
+
+ properties.put(BUCKET_NUM, String.valueOf(distribution.number()));
+ }
+
+ private static List<String> getBucketKeys(Distribution distribution) {
+ return Arrays.stream(distribution.expressions())
+ .map(
+ expression -> {
+ Preconditions.checkArgument(
+ expression instanceof NamedReference.FieldReference,
+ "Paimon bucket keys must be plain column references.");
+ NamedReference.FieldReference reference =
(NamedReference.FieldReference) expression;
+ String[] fieldName = reference.fieldName();
+ Preconditions.checkArgument(
+ fieldName.length == 1, "Paimon bucket keys must be single
columns.");
+ return fieldName[0];
+ })
+ .collect(Collectors.toList());
+ }
+
+ static Distribution getDistribution(Map<String, String> properties) {
+ if (properties == null) {
+ return Distributions.NONE;
+ }
+ String bucketKeys = properties.get(BUCKET_KEY);
+ if (StringUtils.isBlank(bucketKeys)) {
+ return Distributions.NONE;
+ }
+ List<String> bucketKeyList =
+ Arrays.stream(bucketKeys.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
Review Comment:
Better to change `isNotBlank` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]