Copilot commented on code in PR #9753:
URL: https://github.com/apache/gravitino/pull/9753#discussion_r2706492377


##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java:
##########
@@ -501,6 +502,74 @@ private void checkPaimonIndexes(Index[] indexes) {
                     "Paimon only supports primary key Index."));
   }
 
+  private void validateDistribution(Distribution distribution, Column[] 
columns, Index[] indexes) {
+    if (distribution == null || distribution.strategy() == 
Distributions.NONE.strategy()) {
+      return;
+    }
+
+    Preconditions.checkArgument(
+        distribution.strategy() == Strategy.HASH,
+        "Paimon only supports HASH distribution strategy.");
+
+    Preconditions.checkArgument(
+        distribution.expressions() != null && 
distribution.expressions().length > 0,
+        "Paimon bucket keys must be specified for HASH distribution.");
+
+    int bucketNumber = distribution.number();
+    Preconditions.checkArgument(
+        bucketNumber == Distributions.AUTO || bucketNumber > 0,
+        "Paimon bucket number must be positive or AUTO.");
+
+    List<String> bucketKeys = extractBucketKeys(distribution);
+    List<String> columnNames =
+        Arrays.stream(columns).map(Column::name).collect(Collectors.toList());
+    bucketKeys.forEach(
+        bucketKey ->
+            Preconditions.checkArgument(
+                columnNames.stream().anyMatch(name -> 
name.equalsIgnoreCase(bucketKey)),
+                "Distribution column %s does not exist in table columns.",
+                bucketKey));
+
+    List<String> primaryKeys = extractPrimaryKeys(indexes);
+    if (!primaryKeys.isEmpty()) {
+      Preconditions.checkArgument(
+          primaryKeys.containsAll(bucketKeys),

Review Comment:
   Case sensitivity issue in primary key validation. The check uses 
List.containsAll() which is case-sensitive, but bucket keys are validated 
against column names using equalsIgnoreCase (line 529). This inconsistency 
could allow bucket keys with different casing than primary keys to pass 
validation when they shouldn't. For example, if primary key is "Col_2" and 
bucket key is "col_2", the validation at line 529 passes but line 536 would 
fail. Consider using case-insensitive comparison for the primary key check as 
well, or normalizing cases consistently.
   ```suggestion
         List<String> normalizedPrimaryKeys =
             
primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
         List<String> normalizedBucketKeys =
             
bucketKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
         Preconditions.checkArgument(
             normalizedPrimaryKeys.containsAll(normalizedBucketKeys),
   ```



##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
     }
   }
 
+  @Test
+  void testCreatePaimonTableWithDistribution() {
+    String paimonTableName = "test_paimon_table_with_distribution";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+    Map<String, String> properties = Maps.newHashMap();
+
+    Column[] columns =
+        new Column[] {
+          fromPaimonColumn(new DataField(0, "col_1", 
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+          fromPaimonColumn(new DataField(1, "col_2", 
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+        };
+
+    Table table =
+        paimonCatalogOperations.createTable(
+            tableIdentifier,
+            columns,
+            PAIMON_COMMENT,
+            properties,
+            new Transform[0],
+            Distributions.hash(4, NamedReference.field("col_1")),
+            new SortOrder[0]);
+
+    Assertions.assertEquals(HASH, table.distribution().strategy());
+    Assertions.assertEquals(4, table.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
table.distribution().expressions()[0]).fieldName()[0]);
+
+    Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+    Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+    Assertions.assertEquals(4, loadedTable.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+    Assertions.assertEquals(
+        "4", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+    Assertions.assertEquals(
+        "col_1", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+  }
+
+  @Test
+  void testCreatePaimonPrimaryKeyTableWithInvalidBucketKey() {
+    String paimonTableName = "test_paimon_primary_key_table_invalid_bucket";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+
+    Column[] columns =
+        new Column[] {
+          fromPaimonColumn(new DataField(0, "col_1", 
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+          fromPaimonColumn(new DataField(1, "col_2", 
DataTypes.STRING().notNull(), PAIMON_COMMENT))
+        };
+
+    Index[] indexes =
+        Collections.singletonList(
+                primary(PAIMON_PRIMARY_KEY_INDEX_NAME, new String[][] {new 
String[] {"col_2"}}))
+            .toArray(new Index[0]);
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                paimonCatalogOperations.createTable(
+                    tableIdentifier,
+                    columns,
+                    PAIMON_COMMENT,
+                    Maps.newHashMap(),
+                    new Transform[0],
+                    Distributions.hash(2, NamedReference.field("col_1")),
+                    new SortOrder[0],
+                    indexes));
+    Assertions.assertTrue(
+        exception.getMessage().contains("bucket keys must be a subset of 
primary key columns"));
+  }

Review Comment:
   Missing test coverage for invalid distribution scenarios. While there's a 
test for invalid bucket keys with primary key tables, there's no test coverage 
for other invalid cases that the validation checks for, such as: negative 
bucket numbers, non-existent distribution columns, non-HASH distribution 
strategy, or empty bucket keys array.



##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,61 @@ private static void validate(List<String> primaryKeys, 
List<String> partitionKey
     }
   }
 
+  private static void applyDistribution(Map<String, String> properties, 
Distribution distribution) {
+    if (distribution == null || distribution.strategy() == 
Distributions.NONE.strategy()) {
+      return;
+    }
+
+    List<String> bucketKeys = getBucketKeys(distribution);
+    if (!bucketKeys.isEmpty()) {
+      properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+    }
+
+    if (distribution.number() != Distributions.AUTO) {
+      properties.put(BUCKET, String.valueOf(distribution.number()));
+    }
+  }
+
+  private static List<String> getBucketKeys(Distribution distribution) {
+    return Arrays.stream(distribution.expressions())
+        .map(
+            expression -> {
+              Preconditions.checkArgument(
+                  expression instanceof NamedReference,
+                  "Paimon bucket keys must be named references.");
+              String[] fieldName = ((NamedReference) expression).fieldName();
+              Preconditions.checkArgument(
+                  fieldName.length == 1, "Paimon bucket keys must be single 
columns.");
+              return fieldName[0];
+            })
+        .collect(Collectors.toList());
+  }
+
+  private static Distribution getDistribution(Map<String, String> properties) {
+    if (properties == null) {
+      return Distributions.NONE;
+    }
+    String bucketKeys = properties.get(BUCKET_KEY);
+    if (StringUtils.isBlank(bucketKeys)) {
+      return Distributions.NONE;
+    }
+    List<String> bucketKeyList =
+        Arrays.stream(bucketKeys.split(","))
+            .map(String::trim)
+            .filter(StringUtils::isNotEmpty)
+            .collect(Collectors.toList());
+    if (bucketKeyList.isEmpty()) {
+      return Distributions.NONE;
+    }
+    Expression[] expressions =
+        
bucketKeyList.stream().map(NamedReference::field).toArray(Expression[]::new);
+    String bucketValue = properties.get(BUCKET);
+    if (StringUtils.isBlank(bucketValue)) {
+      return Distributions.auto(Strategy.HASH, expressions);
+    }
+    return Distributions.hash(Integer.parseInt(bucketValue.trim()), 
expressions);

Review Comment:
   Potential NumberFormatException is not handled when parsing the bucket 
value. If the bucket value in properties is not a valid integer, this will 
throw an uncaught exception. Consider adding proper error handling with a 
meaningful error message.
   ```suggestion
       String trimmedBucketValue = bucketValue.trim();
       try {
         int bucketNum = Integer.parseInt(trimmedBucketValue);
         Preconditions.checkArgument(
             bucketNum > 0, "Paimon bucket number must be positive, but was: 
%s", bucketNum);
         return Distributions.hash(bucketNum, expressions);
       } catch (NumberFormatException e) {
         throw new IllegalArgumentException(
             String.format(
                 "Invalid bucket value '%s' in table properties: must be a 
valid integer.",
                 bucketValue),
             e);
       }
   ```



##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java:
##########
@@ -188,6 +199,61 @@ private static void validate(List<String> primaryKeys, 
List<String> partitionKey
     }
   }
 
+  private static void applyDistribution(Map<String, String> properties, 
Distribution distribution) {
+    if (distribution == null || distribution.strategy() == 
Distributions.NONE.strategy()) {
+      return;
+    }
+
+    List<String> bucketKeys = getBucketKeys(distribution);
+    if (!bucketKeys.isEmpty()) {
+      properties.put(BUCKET_KEY, String.join(",", bucketKeys));
+    }
+
+    if (distribution.number() != Distributions.AUTO) {
+      properties.put(BUCKET, String.valueOf(distribution.number()));
+    }
+  }
+
+  private static List<String> getBucketKeys(Distribution distribution) {
+    return Arrays.stream(distribution.expressions())
+        .map(
+            expression -> {
+              Preconditions.checkArgument(
+                  expression instanceof NamedReference,
+                  "Paimon bucket keys must be named references.");
+              String[] fieldName = ((NamedReference) expression).fieldName();

Review Comment:
   Inconsistency in type checking between getBucketKeys() and 
extractBucketKeys(). In GravitinoPaimonTable.getBucketKeys() (line 222), the 
code checks for NamedReference, while in 
PaimonCatalogOperations.extractBucketKeys() (line 546), it checks for 
NamedReference.FieldReference. This inconsistency could lead to bugs. The 
validation method should use the same type check as the conversion method to 
ensure consistency.
   ```suggestion
                     expression instanceof NamedReference.FieldReference,
                     "Paimon bucket keys must be field references.");
                 String[] fieldName = ((NamedReference.FieldReference) 
expression).fieldName();
   ```



##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
     }
   }
 
+  @Test
+  void testCreatePaimonTableWithDistribution() {
+    String paimonTableName = "test_paimon_table_with_distribution";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+    Map<String, String> properties = Maps.newHashMap();
+
+    Column[] columns =
+        new Column[] {
+          fromPaimonColumn(new DataField(0, "col_1", 
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+          fromPaimonColumn(new DataField(1, "col_2", 
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+        };
+
+    Table table =
+        paimonCatalogOperations.createTable(
+            tableIdentifier,
+            columns,
+            PAIMON_COMMENT,
+            properties,
+            new Transform[0],
+            Distributions.hash(4, NamedReference.field("col_1")),
+            new SortOrder[0]);
+
+    Assertions.assertEquals(HASH, table.distribution().strategy());
+    Assertions.assertEquals(4, table.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
table.distribution().expressions()[0]).fieldName()[0]);
+
+    Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+    Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+    Assertions.assertEquals(4, loadedTable.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+    Assertions.assertEquals(
+        "4", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+    Assertions.assertEquals(
+        "col_1", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+  }

Review Comment:
   The test coverage is missing a test case for AUTO distribution (when bucket 
number is not explicitly specified). The code in 
GravitinoPaimonTable.getDistribution handles AUTO distribution by calling 
Distributions.auto(), but there's no test verifying this behavior works 
correctly when bucket is not set in properties.



##########
catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java:
##########
@@ -501,6 +502,74 @@ private void checkPaimonIndexes(Index[] indexes) {
                     "Paimon only supports primary key Index."));
   }
 
+  private void validateDistribution(Distribution distribution, Column[] 
columns, Index[] indexes) {
+    if (distribution == null || distribution.strategy() == 
Distributions.NONE.strategy()) {
+      return;
+    }
+
+    Preconditions.checkArgument(
+        distribution.strategy() == Strategy.HASH,
+        "Paimon only supports HASH distribution strategy.");
+
+    Preconditions.checkArgument(
+        distribution.expressions() != null && 
distribution.expressions().length > 0,
+        "Paimon bucket keys must be specified for HASH distribution.");
+
+    int bucketNumber = distribution.number();
+    Preconditions.checkArgument(
+        bucketNumber == Distributions.AUTO || bucketNumber > 0,
+        "Paimon bucket number must be positive or AUTO.");
+
+    List<String> bucketKeys = extractBucketKeys(distribution);
+    List<String> columnNames =
+        Arrays.stream(columns).map(Column::name).collect(Collectors.toList());
+    bucketKeys.forEach(
+        bucketKey ->
+            Preconditions.checkArgument(
+                columnNames.stream().anyMatch(name -> 
name.equalsIgnoreCase(bucketKey)),

Review Comment:
   Case-insensitive comparison is used for bucket key validation but the actual 
bucket keys are extracted case-sensitively. This could lead to inconsistencies. 
If a bucket key is "Col_1" in distribution but the column is "col_1" in the 
schema, the validation would pass (due to equalsIgnoreCase) but the bucket key 
stored in properties would be "Col_1" which may not match Paimon's expected 
case. Consider normalizing the case of bucket keys or documenting the case 
sensitivity behavior.
   ```suggestion
                   columnNames.stream().anyMatch(name -> 
name.equals(bucketKey)),
   ```



##########
catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java:
##########
@@ -335,6 +336,77 @@ void testCreatePaimonPrimaryKeyTable() {
     }
   }
 
+  @Test
+  void testCreatePaimonTableWithDistribution() {
+    String paimonTableName = "test_paimon_table_with_distribution";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+    Map<String, String> properties = Maps.newHashMap();
+
+    Column[] columns =
+        new Column[] {
+          fromPaimonColumn(new DataField(0, "col_1", 
DataTypes.INT().notNull(), PAIMON_COMMENT)),
+          fromPaimonColumn(new DataField(1, "col_2", 
DataTypes.STRING().nullable(), PAIMON_COMMENT))
+        };
+
+    Table table =
+        paimonCatalogOperations.createTable(
+            tableIdentifier,
+            columns,
+            PAIMON_COMMENT,
+            properties,
+            new Transform[0],
+            Distributions.hash(4, NamedReference.field("col_1")),
+            new SortOrder[0]);
+
+    Assertions.assertEquals(HASH, table.distribution().strategy());
+    Assertions.assertEquals(4, table.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
table.distribution().expressions()[0]).fieldName()[0]);
+
+    Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+    Assertions.assertEquals(HASH, loadedTable.distribution().strategy());
+    Assertions.assertEquals(4, loadedTable.distribution().number());
+    Assertions.assertEquals(
+        "col_1", ((NamedReference) 
loadedTable.distribution().expressions()[0]).fieldName()[0]);
+    Assertions.assertEquals(
+        "4", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET));
+    Assertions.assertEquals(
+        "col_1", 
loadedTable.properties().get(PaimonTablePropertiesMetadata.BUCKET_KEY));
+  }

Review Comment:
   Test coverage is missing for multiple bucket keys in distribution. The code 
supports multiple bucket keys (joined with commas in properties), but all test 
cases only use a single bucket key. Consider adding a test case with 
distribution on multiple columns like Distributions.hash(4, 
NamedReference.field("col_1"), NamedReference.field("col_2")).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to