Copilot commented on code in PR #9753:
URL: https://github.com/apache/gravitino/pull/9753#discussion_r2706492377
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java:
##########
@@ -501,6 +502,74 @@ private void checkPaimonIndexes(Index[] indexes) {
"Paimon only supports primary key Index."));
}
+ private void validateDistribution(Distribution distribution, Column[]
columns, Index[] indexes) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ Preconditions.checkArgument(
+ distribution.strategy() == Strategy.HASH,
+ "Paimon only supports HASH distribution strategy.");
+
+ Preconditions.checkArgument(
+ distribution.expressions() != null &&
distribution.expressions().length > 0,
+ "Paimon bucket keys must be specified for HASH distribution.");
+
+ int bucketNumber = distribution.number();
+ Preconditions.checkArgument(
+ bucketNumber == Distributions.AUTO || bucketNumber > 0,
+ "Paimon bucket number must be positive or AUTO.");
+
+ List<String> bucketKeys = extractBucketKeys(distribution);
+ List<String> columnNames =
+ Arrays.stream(columns).map(Column::name).collect(Collectors.toList());
+ bucketKeys.forEach(
+ bucketKey ->
+ Preconditions.checkArgument(
+ columnNames.stream().anyMatch(name ->
name.equalsIgnoreCase(bucketKey)),
+ "Distribution column %s does not exist in table columns.",
+ bucketKey));
+
+ List<String> primaryKeys = extractPrimaryKeys(indexes);
+ if (!primaryKeys.isEmpty()) {
+ Preconditions.checkArgument(
+ primaryKeys.containsAll(bucketKeys),
Review Comment:
Case sensitivity issue in primary key validation. The check uses
List.containsAll() which is case-sensitive, but bucket keys are validated
against column names using equalsIgnoreCase (line 529). This inconsistency
could allow bucket keys with different casing than primary keys to pass
validation when they shouldn't. For example, if primary key is "Col_2" and
bucket key is "col_2", the validation at line 529 passes but line 536 would
fail. Consider using case-insensitive comparison for the primary key check as
well, or normalizing cases consistently.
```suggestion
List<String> normalizedPrimaryKeys =
primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
List<String> normalizedBucketKeys =
bucketKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
Preconditions.checkArgument(
normalizedPrimaryKeys.containsAll(normalizedBucketKeys),
```
##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
}
}
+ @Test
+ void testCreatePaimonTableWithDistribution() {
+ String paimonTableName = "test_paimon_table_with_distribution";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+ Map<String, String> properties = Maps.newHashMap();
+
+ Column[] columns =
+ new Column[] {
+ fromPaimonColumn(new DataField(0, "col_1",
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+ fromPaimonColumn(new DataField(1, "col_2",
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+ };
+
+ Table table =
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ properties,
+ new Transform[0],
+ Distributions.hash(4, NamedReference.field("col_1")),
+ new SortOrder[0]);
+
+ Assertions.assertEquals(HASH, table.distribution().strategy());
+ Assertions.assertEquals(4, table.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
table.distribution().expressions()[0]).fieldName()[0]);
+
+ Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+ Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+ Assertions.assertEquals(4, loadedTable.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+ Assertions.assertEquals(
+ "4",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+ Assertions.assertEquals(
+ "col_1",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+ }
+
+ @Test
+ void testCreatePaimonPrimaryKeyTableWithInvalidBucketKey() {
+ String paimonTableName = "test_paimon_primary_key_table_invalid_bucket";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+
+ Column[] columns =
+ new Column[] {
+ fromPaimonColumn(new DataField(0, "col_1",
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+ fromPaimonColumn(new DataField(1, "col_2",
DataTypes.STRING().notNull(), PAIMON_COMMENT))
+ };
+
+ Index[] indexes =
+ Collections.singletonList(
+ primary(PAIMON_PRIMARY_KEY_INDEX_NAME, new String[][] {new
String[] {"col_2"}}))
+ .toArray(new Index[0]);
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ Maps.newHashMap(),
+ new Transform[0],
+ Distributions.hash(2, NamedReference.field("col_1")),
+ new SortOrder[0],
+ indexes));
+ Assertions.assertTrue(
+ exception.getMessage().contains("bucket keys must be a subset of
primary key columns"));
+ }
Review Comment:
Missing test coverage for invalid distribution scenarios. While there's a
test for invalid bucket keys with primary key tables, there's no test coverage
for other invalid cases that the validation checks for, such as: negative
bucket numbers, non-existent distribution columns, non-HASH distribution
strategy, or empty bucket keys array.
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,61 @@ private static void validate(List<String> primaryKeys,
List<String> partitionKey
}
}
+ private static void applyDistribution(Map<String, String> properties,
Distribution distribution) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ List<String> bucketKeys = getBucketKeys(distribution);
+ if (!bucketKeys.isEmpty()) {
+ properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+ }
+
+ if (distribution.number() != Distributions.AUTO) {
+ properties.put(BUCKET, String.valueOf(distribution.number()));
+ }
+ }
+
+ private static List<String> getBucketKeys(Distribution distribution) {
+ return Arrays.stream(distribution.expressions())
+ .map(
+ expression -> {
+ Preconditions.checkArgument(
+ expression instanceof NamedReference,
+ "Paimon bucket keys must be named references.");
+ String[] fieldName = ((NamedReference) expression).fieldName();
+ Preconditions.checkArgument(
+ fieldName.length == 1, "Paimon bucket keys must be single
columns.");
+ return fieldName[0];
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static Distribution getDistribution(Map<String, String> properties) {
+ if (properties == null) {
+ return Distributions.NONE;
+ }
+ String bucketKeys = properties.get(BUCKET_KEY);
+ if (StringUtils.isBlank(bucketKeys)) {
+ return Distributions.NONE;
+ }
+ List<String> bucketKeyList =
+ Arrays.stream(bucketKeys.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .collect(Collectors.toList());
+ if (bucketKeyList.isEmpty()) {
+ return Distributions.NONE;
+ }
+ Expression[] expressions =
+
bucketKeyList.stream().map(NamedReference::field).toArray(Expression[]::new);
+ String bucketValue = properties.get(BUCKET);
+ if (StringUtils.isBlank(bucketValue)) {
+ return Distributions.auto(Strategy.HASH, expressions);
+ }
+ return Distributions.hash(Integer.parseInt(bucketValue.trim()),
expressions);
Review Comment:
Potential NumberFormatException is not handled when parsing the bucket
value. If the bucket value in properties is not a valid integer, this will
throw an uncaught exception. Consider adding proper error handling with a
meaningful error message.
```suggestion
String trimmedBucketValue = bucketValue.trim();
try {
int bucketNum = Integer.parseInt(trimmedBucketValue);
Preconditions.checkArgument(
bucketNum > 0, "Paimon bucket number must be positive, but was:
%s", bucketNum);
return Distributions.hash(bucketNum, expressions);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
String.format(
"Invalid bucket value '%s' in table properties: must be a
valid integer.",
bucketValue),
e);
}
```
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,61 @@ private static void validate(List<String> primaryKeys,
List<String> partitionKey
}
}
+ private static void applyDistribution(Map<String, String> properties,
Distribution distribution) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ List<String> bucketKeys = getBucketKeys(distribution);
+ if (!bucketKeys.isEmpty()) {
+ properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+ }
+
+ if (distribution.number() != Distributions.AUTO) {
+ properties.put(BUCKET, String.valueOf(distribution.number()));
+ }
+ }
+
+ private static List<String> getBucketKeys(Distribution distribution) {
+ return Arrays.stream(distribution.expressions())
+ .map(
+ expression -> {
+ Preconditions.checkArgument(
+ expression instanceof NamedReference,
+ "Paimon bucket keys must be named references.");
+ String[] fieldName = ((NamedReference) expression).fieldName();
Review Comment:
Inconsistency in type checking between getBucketKeys() and
extractBucketKeys(). In GravitinoPaimonTable.getBucketKeys() (line 222), the
code checks for NamedReference, while in
PaimonCatalogOperations.extractBucketKeys() (line 546), it checks for
NamedReference.FieldReference. This inconsistency could lead to bugs. The
validation method should use the same type check as the conversion method to
ensure consistency.
```suggestion
expression instanceof NamedReference.FieldReference,
"Paimon bucket keys must be field references.");
String[] fieldName = ((NamedReference.FieldReference)
expression).fieldName();
```
##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
}
}
+ @Test
+ void testCreatePaimonTableWithDistribution() {
+ String paimonTableName = "test_paimon_table_with_distribution";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+ Map<String, String> properties = Maps.newHashMap();
+
+ Column[] columns =
+ new Column[] {
+ fromPaimonColumn(new DataField(0, "col_1",
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+ fromPaimonColumn(new DataField(1, "col_2",
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+ };
+
+ Table table =
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ properties,
+ new Transform[0],
+ Distributions.hash(4, NamedReference.field("col_1")),
+ new SortOrder[0]);
+
+ Assertions.assertEquals(HASH, table.distribution().strategy());
+ Assertions.assertEquals(4, table.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
table.distribution().expressions()[0]).fieldName()[0]);
+
+ Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+ Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+ Assertions.assertEquals(4, loadedTable.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+ Assertions.assertEquals(
+ "4",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+ Assertions.assertEquals(
+ "col_1",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+ }
Review Comment:
The test coverage is missing a test case for AUTO distribution (when bucket
number is not explicitly specified). The code in
GravitinoPaimonTable.getDistribution handles AUTO distribution by calling
Distributions.auto(), but there's no test verifying this behavior works
correctly when bucket is not set in properties.
##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java:
##########
@@ -501,6 +502,74 @@ private void checkPaimonIndexes(Index[] indexes) {
"Paimon only supports primary key Index."));
}
+ private void validateDistribution(Distribution distribution, Column[]
columns, Index[] indexes) {
+ if (distribution == null || distribution.strategy() ==
Distributions.NONE.strategy()) {
+ return;
+ }
+
+ Preconditions.checkArgument(
+ distribution.strategy() == Strategy.HASH,
+ "Paimon only supports HASH distribution strategy.");
+
+ Preconditions.checkArgument(
+ distribution.expressions() != null &&
distribution.expressions().length > 0,
+ "Paimon bucket keys must be specified for HASH distribution.");
+
+ int bucketNumber = distribution.number();
+ Preconditions.checkArgument(
+ bucketNumber == Distributions.AUTO || bucketNumber > 0,
+ "Paimon bucket number must be positive or AUTO.");
+
+ List<String> bucketKeys = extractBucketKeys(distribution);
+ List<String> columnNames =
+ Arrays.stream(columns).map(Column::name).collect(Collectors.toList());
+ bucketKeys.forEach(
+ bucketKey ->
+ Preconditions.checkArgument(
+ columnNames.stream().anyMatch(name ->
name.equalsIgnoreCase(bucketKey)),
Review Comment:
Case-insensitive comparison is used for bucket key validation but the actual
bucket keys are extracted case-sensitively. This could lead to inconsistencies.
If a bucket key is "Col_1" in distribution but the column is "col_1" in the
schema, the validation would pass (due to equalsIgnoreCase) but the bucket key
stored in properties would be "Col_1" which may not match Paimon's expected
case. Consider normalizing the case of bucket keys or documenting the case
sensitivity behavior.
```suggestion
columnNames.stream().anyMatch(name ->
name.equals(bucketKey)),
```
##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
}
}
+ @Test
+ void testCreatePaimonTableWithDistribution() {
+ String paimonTableName = "test_paimon_table_with_distribution";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+ Map<String, String> properties = Maps.newHashMap();
+
+ Column[] columns =
+ new Column[] {
+ fromPaimonColumn(new DataField(0, "col_1",
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+ fromPaimonColumn(new DataField(1, "col_2",
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+ };
+
+ Table table =
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ properties,
+ new Transform[0],
+ Distributions.hash(4, NamedReference.field("col_1")),
+ new SortOrder[0]);
+
+ Assertions.assertEquals(HASH, table.distribution().strategy());
+ Assertions.assertEquals(4, table.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
table.distribution().expressions()[0]).fieldName()[0]);
+
+ Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+ Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+ Assertions.assertEquals(4, loadedTable.distribution().number());
+ Assertions.assertEquals(
+ "col_1", ((NamedReference)
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+ Assertions.assertEquals(
+ "4",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+ Assertions.assertEquals(
+ "col_1",
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+ }
Review Comment:
Test coverage is missing for multiple bucket keys in distribution. The code
supports multiple bucket keys (joined with commas in properties), but all test
cases only use a single bucket key. Consider adding a test case with
distribution on multiple columns like Distributions.hash(4,
NamedReference.field("col_1"), NamedReference.field("col_2")).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]