boneanxs commented on code in PR #10515:
URL: https://github.com/apache/hudi/pull/10515#discussion_r1457269006


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
       }
     };
 
-    return rows.sparkSession().createDataFrame(rowJavaRDD
-        .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
-        .partitionBy(partitioner)
-        .values(), rows.schema());
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return rows.sparkSession().createDataFrame(rowJavaRDD
+              .mapToPair(row -> new Tuple2<>(row, row))

Review Comment:
   getBucketId(row) more suitable? Unless we'll double records size here.
   
   Besides, we can move this map out to reduce duplicates.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -66,7 +74,7 @@ public class 
ConsistentBucketIndexBulkInsertPartitionerWithRows
 
   private final RowRecordKeyExtractor extractor;
 
-  public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table, 
boolean populateMetaFields) {
+  public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table, 
Map<String, String> strategyParams, boolean populateMetaFields) {

Review Comment:
   ditto



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
       }
     };
 
-    return rows.sparkSession().createDataFrame(rowJavaRDD
-        .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
-        .partitionBy(partitioner)
-        .values(), rows.schema());
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return rows.sparkSession().createDataFrame(rowJavaRDD
+              .mapToPair(row -> new Tuple2<>(row, row))
+              .repartitionAndSortWithinPartitions(partitioner, new 
CustomRowColumnsComparator())
+              .values(),
+          rows.schema());
+    } else if (table.requireSortedRecords() || 
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {

Review Comment:
   Here we should only support `PARTITION_SORT`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -142,10 +203,11 @@ public void addHashingChildrenNodes(String partition, 
List<ConsistentHashingNode
 
   @Override
   public boolean arePartitionRecordsSorted() {
-    return false;
+    return (sortColumnNames != null && sortColumnNames.length > 0)
+        || table.requireSortedRecords() || 
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
   }
 
-  private int getBucketId(Row row) {
+  private Integer getBucketId(Row row) {

Review Comment:
   Any reason change here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##########
@@ -72,7 +72,7 @@ public HoodieData<WriteStatus> 
performClusteringWithRecordsAsRow(Dataset<Row> in
 
     HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
 
-    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
shouldPreserveHoodieMetadata);
+    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
strategyParams, shouldPreserveHoodieMetadata);

Review Comment:
   nit: Start a new line to enhance readability



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
       }
     };
 
-    return rows.sparkSession().createDataFrame(rowJavaRDD
-        .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
-        .partitionBy(partitioner)
-        .values(), rows.schema());
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return rows.sparkSession().createDataFrame(rowJavaRDD
+              .mapToPair(row -> new Tuple2<>(row, row))
+              .repartitionAndSortWithinPartitions(partitioner, new 
CustomRowColumnsComparator())

Review Comment:
   Prefer use spark native api here, e.g.
   
   ```scala
   rows.sparkSession().createDataFrame(rowJavaRDD
                         .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
                         .partitionBy(partitioner)
                         .values(), rows.schema())
                 .sortWithinPartitions(...);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to