alexeykudinkin commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1106129776
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java:
##########
@@ -31,38 +29,15 @@
* <p>
Review Comment:
Good catch!
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
/**
- * A partitioner that does sorting based on specified column values for each
spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the
tuple of
+ * values of the columns configured for ordering.
*/
-public class RowCustomColumnsSortPartitioner implements
BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends
RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
- private final String[] sortColumnNames;
+ private final String[] orderByColumnNames;
- public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
- this.sortColumnNames = getSortColumnName(config);
+ public RowCustomColumnsSortPartitioner(HoodieWriteConfig config,
HoodieTableConfig tableConfig) {
+ super(tableConfig);
+ this.orderByColumnNames = getOrderByColumnNames(config);
+
+ checkState(orderByColumnNames.length > 0);
}
- public RowCustomColumnsSortPartitioner(String[] columnNames) {
- this.sortColumnNames = columnNames;
+ public RowCustomColumnsSortPartitioner(String[] columnNames,
HoodieTableConfig tableConfig) {
+ super(tableConfig);
+ this.orderByColumnNames = columnNames;
+
+ checkState(orderByColumnNames.length > 0);
}
@Override
- public Dataset<Row> repartitionRecords(Dataset<Row> records, int
outputSparkPartitions) {
- final String[] sortColumns = this.sortColumnNames;
- return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
sortColumns)
- .coalesce(outputSparkPartitions);
+ public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int
targetPartitionNumHint) {
+ Dataset<Row> repartitionedDataset;
+
+ // NOTE: In case of partitioned table even "global" ordering (across all
RDD partitions) could
+ // not change table's partitioning and therefore there's no point in
doing global sorting
+ // across "physical" partitions, and instead we can reduce total
amount of data being
+ // shuffled by doing do "local" sorting:
Review Comment:
This is also used for custom ordering by the users
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]