yihua commented on code in PR #5328:
URL: https://github.com/apache/hudi/pull/5328#discussion_r930430983


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java:
##########
@@ -19,19 +19,42 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 /**
- * A built-in partitioner that does local sorting for each spark partitions 
after coalesce for bulk insert operation, corresponding to the {@code 
BulkInsertSortMode.PARTITION_SORT} mode.
+ * A built-in partitioner that does local sorting w/in the Spark partition,
+ * corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode.
  */
-public class PartitionSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
+public class PartitionSortPartitionerWithRows extends 
RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
+
+  public PartitionSortPartitionerWithRows(HoodieTableConfig tableConfig) {
+    super(tableConfig);
+  }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
-    return 
rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
 HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int 
outputSparkPartitions) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: Datasets being ingested into partitioned tables are additionally 
re-partitioned to better
+    //       align dataset's logical partitioning with expected table's 
physical partitioning to
+    //       provide for appropriate file-sizing and better control of the 
number of files created.
+    //
+    //       Please check out {@code GlobalSortPartitioner} java-doc for more 
details
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));

Review Comment:
   This indeed changes the sorting behavior, right?
   The old DAG: `spark partitions -> coalesce (combining existing partitions to 
avoid a full shuffle) -> sort within each spark partition`
   The new DAG: `spark partitions -> repartition based on partition path (full 
shuffle) -> spark partitions corresponding to table partitions -> sort within 
each spark/table partition`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,42 +19,69 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each 
spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the 
tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements 
BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends 
RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
+
+  private final String[] orderByColumnNames;
 
-  private final String[] sortColumnNames;
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, 
HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, 
HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.coalesce(outputSparkPartitions)
-        .sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
sortColumns);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int 
outputSparkPartitions) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all 
RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in 
doing global sorting
+    //       across "physical" partitions, and instead we can reduce total 
amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" 
partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));

Review Comment:
   Same here on repartition vs coalesce.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java:
##########
@@ -20,46 +20,62 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
+import org.apache.spark.sql.HoodieJavaRDDUtils;
 import scala.Tuple2;
 
+import java.util.Comparator;
+
 /**
  * A built-in partitioner that does local sorting for each RDD partition
- * after coalesce for bulk insert operation, corresponding to the
- * {@code BulkInsertSortMode.PARTITION_SORT} mode.
+ * after coalescing it to specified number of partitions.
+ * Corresponds to the {@link BulkInsertSortMode#PARTITION_SORT} mode.
  *
  * @param <T> HoodieRecordPayload type
  */
 public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
-    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+    extends RepartitioningBulkInsertPartitionerBase<JavaRDD<HoodieRecord<T>>> {
 
+  public RDDPartitionSortPartitioner(HoodieTableConfig tableConfig) {
+    super(tableConfig);
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
-    return records.coalesce(outputSparkPartitions)
-        .mapToPair(record ->
-            new Tuple2<>(
-                new StringBuilder()
-                    .append(record.getPartitionPath())
-                    .append("+")
-                    .append(record.getRecordKey())
-                    .toString(), record))
-        .mapPartitions(partition -> {
-          // Sort locally in partition
-          List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
-          for (; partition.hasNext(); ) {
-            recordList.add(partition.next());
-          }
-          Collections.sort(recordList, (o1, o2) -> o1._1.compareTo(o2._1));
-          return recordList.stream().map(e -> e._2).iterator();
-        });
+
+    // NOTE: Datasets being ingested into partitioned tables are additionally 
re-partitioned to better
+    //       align dataset's logical partitioning with expected table's 
physical partitioning to
+    //       provide for appropriate file-sizing and better control of the 
number of files created.
+    //
+    //       Please check out {@code GlobalSortPartitioner} java-doc for more 
details
+    if (isPartitionedTable) {
+      PartitionPathRDDPartitioner partitioner =
+          new PartitionPathRDDPartitioner((pair) -> ((Pair<String, String>) 
pair).getKey(), outputSparkPartitions);
+
+      // Both partition-path and record-key are extracted, since
+      //    - Partition-path will be used for re-partitioning (as called out 
above)
+      //    - Record-key will be used for sorting the records w/in individual 
partitions
+      return records.mapToPair(record -> new 
Tuple2<>(Pair.of(record.getPartitionPath(), record.getRecordKey()), record))
+          // NOTE: We're sorting by (partition-path, record-key) pair to make 
sure that in case
+          //       when there are less Spark partitions (requested) than there 
are physical partitions
+          //       (in which case multiple physical partitions, will be 
handled w/in single Spark
+          //       partition) records w/in a single Spark partition are still 
ordered first by
+          //       partition-path, then record's key
+          .repartitionAndSortWithinPartitions(partitioner, 
Comparator.naturalOrder())

Review Comment:
   Again, the original sort behavior does not repartition the records based on 
the table partition path.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java:
##########
@@ -27,16 +28,18 @@
  */
 public abstract class BulkInsertInternalPartitionerFactory {
 
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
-    switch (sortMode) {
+  public static BulkInsertPartitioner get(BulkInsertSortMode bulkInsertMode, 
HoodieTableConfig tableConfig) {
+    switch (bulkInsertMode) {
       case NONE:
-        return new NonSortPartitioner();
+        return new NonSortPartitioner<>();
       case GLOBAL_SORT:
-        return new GlobalSortPartitioner();
+        return new GlobalSortPartitioner<>();
       case PARTITION_SORT:
-        return new RDDPartitionSortPartitioner();
+        return new RDDPartitionSortPartitioner<>(tableConfig);

Review Comment:
   `RDD` is added for clarification that the sorting happens within one RDD 
partition, not the table/physical partition.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RepartitioningBulkInsertPartitionerBase.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Base class for any {@link BulkInsertPartitioner} implementation that does 
re-partitioning,
+ * to better align "logical" (query-engine's partitioning of the incoming 
dataset) w/ the table's
+ * "physical" partitioning
+ */
+public abstract class RepartitioningBulkInsertPartitionerBase<I> implements 
BulkInsertPartitioner<I> {
+
+  protected final boolean isPartitionedTable;
+
+  public RepartitioningBulkInsertPartitionerBase(HoodieTableConfig 
tableConfig) {
+    this.isPartitionedTable = tableConfig.getPartitionFields().map(pfs -> 
pfs.length > 0).orElse(false);
+  }
+
+  protected static class PartitionPathRDDPartitioner extends Partitioner 
implements Serializable {
+    private final SerializableFunctionUnchecked<Object, String> 
partitionPathExtractor;
+    private final int numPartitions;
+
+    PartitionPathRDDPartitioner(SerializableFunctionUnchecked<Object, String> 
partitionPathExtractor, int numPartitions) {
+      this.partitionPathExtractor = partitionPathExtractor;
+      this.numPartitions = numPartitions;
+    }
+
+    @Override
+    public int numPartitions() {
+      return numPartitions;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int getPartition(Object o) {
+      return Math.abs(Objects.hash(partitionPathExtractor.apply(o))) % 
numPartitions;

Review Comment:
   Understood.  This should be documented in the docs.  Even without sorting, a 
better bucketing strategy can be developed to avoid skews, e.g., a larger table 
partition can be split into multiple Spark partitions.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -138,4 +140,16 @@ trait SparkAdapter extends Serializable {
    * TODO move to HoodieCatalystExpressionUtils
    */
   def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+  /**
+   * Insert all records, updates related task metrics, and return a completion 
iterator
+   * over all the data written to this [[ExternalSorter]], aggregated by our 
aggregator.
+   *
+   * On task completion (success, failure, or cancellation), it releases 
resources by
+   * calling `stop()`.
+   *
+   * NOTE: This method is an [[ExternalSorter#insertAllAndUpdateMetrics]] 
back-ported to Spark 2.4
+   */
+  def insertInto[K, V, C](ctx: TaskContext, records: Iterator[Product2[K, V]], 
sorter: ExternalSorter[K, V, C]): Iterator[Product2[K, C]]

Review Comment:
   Why is this called `insertInto`, which gets me confused with`INSERT INTO` 
SQL?  Should it be renamed as `sortExternally`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java:
##########
@@ -20,46 +20,62 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
+import org.apache.spark.sql.HoodieJavaRDDUtils;
 import scala.Tuple2;
 
+import java.util.Comparator;
+
 /**
  * A built-in partitioner that does local sorting for each RDD partition
- * after coalesce for bulk insert operation, corresponding to the
- * {@code BulkInsertSortMode.PARTITION_SORT} mode.
+ * after coalescing it to specified number of partitions.
+ * Corresponds to the {@link BulkInsertSortMode#PARTITION_SORT} mode.
  *
  * @param <T> HoodieRecordPayload type
  */
 public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
-    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+    extends RepartitioningBulkInsertPartitionerBase<JavaRDD<HoodieRecord<T>>> {
 
+  public RDDPartitionSortPartitioner(HoodieTableConfig tableConfig) {
+    super(tableConfig);
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
-    return records.coalesce(outputSparkPartitions)
-        .mapToPair(record ->
-            new Tuple2<>(
-                new StringBuilder()
-                    .append(record.getPartitionPath())
-                    .append("+")
-                    .append(record.getRecordKey())
-                    .toString(), record))
-        .mapPartitions(partition -> {
-          // Sort locally in partition
-          List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
-          for (; partition.hasNext(); ) {
-            recordList.add(partition.next());
-          }
-          Collections.sort(recordList, (o1, o2) -> o1._1.compareTo(o2._1));
-          return recordList.stream().map(e -> e._2).iterator();
-        });
+
+    // NOTE: Datasets being ingested into partitioned tables are additionally 
re-partitioned to better
+    //       align dataset's logical partitioning with expected table's 
physical partitioning to
+    //       provide for appropriate file-sizing and better control of the 
number of files created.
+    //
+    //       Please check out {@code GlobalSortPartitioner} java-doc for more 
details
+    if (isPartitionedTable) {
+      PartitionPathRDDPartitioner partitioner =
+          new PartitionPathRDDPartitioner((pair) -> ((Pair<String, String>) 
pair).getKey(), outputSparkPartitions);
+
+      // Both partition-path and record-key are extracted, since
+      //    - Partition-path will be used for re-partitioning (as called out 
above)
+      //    - Record-key will be used for sorting the records w/in individual 
partitions
+      return records.mapToPair(record -> new 
Tuple2<>(Pair.of(record.getPartitionPath(), record.getRecordKey()), record))
+          // NOTE: We're sorting by (partition-path, record-key) pair to make 
sure that in case
+          //       when there are less Spark partitions (requested) than there 
are physical partitions
+          //       (in which case multiple physical partitions, will be 
handled w/in single Spark
+          //       partition) records w/in a single Spark partition are still 
ordered first by
+          //       partition-path, then record's key
+          .repartitionAndSortWithinPartitions(partitioner, 
Comparator.naturalOrder())
+          .values();
+    } else {
+      JavaPairRDD<String, HoodieRecord<T>> kvPairsRDD =
+          records.coalesce(outputSparkPartitions).mapToPair(record -> new 
Tuple2<>(record.getRecordKey(), record));
+
+      // NOTE: [[JavaRDD]] doesn't expose an API to do the sorting w/o 
(re-)shuffling, as such
+      //       we're relying on our own sequence to achieve that
+      return HoodieJavaRDDUtils.sortWithinPartitions(kvPairsRDD, 
Comparator.naturalOrder()).values();

Review Comment:
   Can we do `mapPartitions` and then sort instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to