boneanxs commented on code in PR #10515:
URL: https://github.com/apache/hudi/pull/10515#discussion_r1457269006
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
}
};
- return rows.sparkSession().createDataFrame(rowJavaRDD
- .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
- .partitionBy(partitioner)
- .values(), rows.schema());
+ if (sortColumnNames != null && sortColumnNames.length > 0) {
+ return rows.sparkSession().createDataFrame(rowJavaRDD
+ .mapToPair(row -> new Tuple2<>(row, row))
Review Comment:
getBucketId(row) more suitable? Unless we'll double records size here.
Besides, we can move this map out to reduce duplicates.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -66,7 +74,7 @@ public class
ConsistentBucketIndexBulkInsertPartitionerWithRows
private final RowRecordKeyExtractor extractor;
- public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
boolean populateMetaFields) {
+ public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table,
Map<String, String> strategyParams, boolean populateMetaFields) {
Review Comment:
ditto
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
}
};
- return rows.sparkSession().createDataFrame(rowJavaRDD
- .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
- .partitionBy(partitioner)
- .values(), rows.schema());
+ if (sortColumnNames != null && sortColumnNames.length > 0) {
+ return rows.sparkSession().createDataFrame(rowJavaRDD
+ .mapToPair(row -> new Tuple2<>(row, row))
+ .repartitionAndSortWithinPartitions(partitioner, new
CustomRowColumnsComparator())
+ .values(),
+ rows.schema());
+ } else if (table.requireSortedRecords() ||
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {
Review Comment:
Here we should only support `PARTITION_SORT`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -142,10 +203,11 @@ public void addHashingChildrenNodes(String partition,
List<ConsistentHashingNode
@Override
public boolean arePartitionRecordsSorted() {
- return false;
+ return (sortColumnNames != null && sortColumnNames.length > 0)
+ || table.requireSortedRecords() ||
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
}
- private int getBucketId(Row row) {
+ private Integer getBucketId(Row row) {
Review Comment:
Any reason change here?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##########
@@ -72,7 +72,7 @@ public HoodieData<WriteStatus>
performClusteringWithRecordsAsRow(Dataset<Row> in
HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
- ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
shouldPreserveHoodieMetadata);
+ ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(),
strategyParams, shouldPreserveHoodieMetadata);
Review Comment:
nit: Start a new line to enhance readability
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
}
};
- return rows.sparkSession().createDataFrame(rowJavaRDD
- .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
- .partitionBy(partitioner)
- .values(), rows.schema());
+ if (sortColumnNames != null && sortColumnNames.length > 0) {
+ return rows.sparkSession().createDataFrame(rowJavaRDD
+ .mapToPair(row -> new Tuple2<>(row, row))
+ .repartitionAndSortWithinPartitions(partitioner, new
CustomRowColumnsComparator())
Review Comment:
Prefer use spark native api here, e.g.
```scala
rows.sparkSession().createDataFrame(rowJavaRDD
.mapToPair(row -> new Tuple2<>(getBucketId(row), row))
.partitionBy(partitioner)
.values(), rows.schema())
.sortWithinPartitions(...);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]