stream2000 commented on code in PR #10515:
URL: https://github.com/apache/hudi/pull/10515#discussion_r1458254203
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##########
@@ -72,7 +72,7 @@ public HoodieData<WriteStatus>
performClusteringWithRecordsAsRow(Dataset<Row> in
HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
- ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
shouldPreserveHoodieMetadata);
+ ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
strategyParams, shouldPreserveHoodieMetadata);
Review Comment:
sure and done.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -142,10 +203,11 @@ public void addHashingChildrenNodes(String partition,
List<ConsistentHashingNode
@Override
public boolean arePartitionRecordsSorted() {
- return false;
+ return (sortColumnNames != null && sortColumnNames.length > 0)
+ || table.requireSortedRecords() ||
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
}
- private int getBucketId(Row row) {
+ private Integer getBucketId(Row row) {
Review Comment:
reverted it.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
}
};
- return rows.sparkSession().createDataFrame(rowJavaRDD
- .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
- .partitionBy(partitioner)
- .values(), rows.schema());
+ if (sortColumnNames != null && sortColumnNames.length > 0) {
+ return rows.sparkSession().createDataFrame(rowJavaRDD
+ .mapToPair(row -> new Tuple2<>(row, row))
+ .repartitionAndSortWithinPartitions(partitioner, new
CustomRowColumnsComparator())
+ .values(),
+ rows.schema());
+ } else if (table.requireSortedRecords() ||
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {
Review Comment:
Yes we are actually implementing `PARTITION_SORT`, I'm just wondering for
sort modes other than PARTITION_SORT, should we default to a 'no sort' behavior
similar to `BulkInsertSortMode=NONE`, automatically switch to `PARTITION_SORT`,
or should we throw an exception to indicate that the sort mode is not
supported?
Hope for your opinion, or we can keep the current behavior that switch to
`PARTITION_SORT` automatically.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
}
};
- return rows.sparkSession().createDataFrame(rowJavaRDD
- .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
- .partitionBy(partitioner)
- .values(), rows.schema());
+ if (sortColumnNames != null && sortColumnNames.length > 0) {
+ return rows.sparkSession().createDataFrame(rowJavaRDD
+ .mapToPair(row -> new Tuple2<>(row, row))
Review Comment:
We will still need the row for comparing and sort it, so keep this line `
.mapToPair(row -> new Tuple2<>(row, row))` is OK.
Also comparing with partitionBy + sortWithinPartitions,
repartitionAndSortWithinPartitions will be more efficient because it performs
the shuffle operation only once, with both repartitioning and sorting happening
in the same step. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]