vinothchandar commented on code in PR #5328:
URL: https://github.com/apache/hudi/pull/5328#discussion_r928149228


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java:
##########
@@ -20,34 +20,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
 
 /**
- * A built-in partitioner that does global sorting for the input records 
across partitions
- * after repartition for bulk insert operation, corresponding to the
- * {@code BulkInsertSortMode.GLOBAL_SORT} mode.
+ * A built-in partitioner that does global sorting of the input records across 
all Spark partitions,
+ * corresponding to the {@link BulkInsertSortMode#GLOBAL_SORT} mode.
  *
- * @param <T> HoodieRecordPayload type
+ * NOTE: Records are sorted by (partitionPath, key) tuple to make sure that 
physical
+ *       partitioning on disk is aligned with logical partitioning of the 
dataset (by Spark)
+ *       as much as possible.
+ *       Consider following scenario: dataset is inserted w/ parallelism of N 
(meaning that Spark
+ *       will partition it into N _logical_ partitions while writing), and has 
M physical partitions

Review Comment:
   "Table partitions" may be a better term instead of physical



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java:
##########
@@ -27,16 +28,18 @@
  */
 public abstract class BulkInsertInternalPartitionerFactory {
 
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
-    switch (sortMode) {
+  public static BulkInsertPartitioner get(BulkInsertSortMode bulkInsertMode, 
HoodieTableConfig tableConfig) {
+    switch (bulkInsertMode) {
       case NONE:
-        return new NonSortPartitioner();
+        return new NonSortPartitioner<>();
       case GLOBAL_SORT:
-        return new GlobalSortPartitioner();
+        return new GlobalSortPartitioner<>();
       case PARTITION_SORT:
-        return new RDDPartitionSortPartitioner();
+        return new RDDPartitionSortPartitioner<>(tableConfig);

Review Comment:
   at some point, we should rename the RDDxxxx class also consistently 
withothers



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java:
##########
@@ -20,34 +20,46 @@
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
 
 /**
- * A built-in partitioner that does global sorting for the input records 
across partitions
- * after repartition for bulk insert operation, corresponding to the
- * {@code BulkInsertSortMode.GLOBAL_SORT} mode.
+ * A built-in partitioner that does global sorting of the input records across 
all Spark partitions,
+ * corresponding to the {@link BulkInsertSortMode#GLOBAL_SORT} mode.
  *
- * @param <T> HoodieRecordPayload type
+ * NOTE: Records are sorted by (partitionPath, key) tuple to make sure that 
physical
+ *       partitioning on disk is aligned with logical partitioning of the 
dataset (by Spark)
+ *       as much as possible.
+ *       Consider following scenario: dataset is inserted w/ parallelism of N 
(meaning that Spark
+ *       will partition it into N _logical_ partitions while writing), and has 
M physical partitions
+ *       on disk. Without alignment "physical" and "logical" partitions 
(assuming
+ *       here that records are inserted uniformly across partitions), every 
logical partition,
+ *       which might be handled by separate executor, will be inserting into 
every physical
+ *       partition, creating a new file for the records it's writing, 
entailing that new N x M
+ *       files will be added to the table.
+ *
+ *       Instead, we want no more than N + M files to be created, and 
therefore sort by
+ *       a tuple of (partitionPath, key), which provides for following 
invariants where every
+ *       Spark partition will either
+ *          - Hold _all_ record from particular physical partition, or
+ *          - Hold _only_ records from that particular physical partition
+ *
+ *       In other words a single Spark partition will either be hold full set 
of records for
+ *       a few smaller partitions, or it will hold just the records of the 
larger one. This
+ *       allows us to provide a guarantee that no more N + M files will be 
created.
+ *
+ * @param <T> {@code HoodieRecordPayload} type
  */
 public class GlobalSortPartitioner<T extends HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
-    // Now, sort the records and line them up nicely for loading.
-    return records.sortBy(record -> {
-      // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-      // the records split evenly across RDD partitions, such that small 
partitions fit
-      // into 1 RDD partition, while big ones spread evenly across multiple 
RDD partitions
-      return new StringBuilder()
-          .append(record.getPartitionPath())
-          .append("+")
-          .append(record.getRecordKey())
-          .toString();
-    }, true, outputSparkPartitions);
+    return records.sortBy(record ->
+        Pair.of(record.getPartitionPath(), record.getRecordKey()), true, 
outputSparkPartitions);

Review Comment:
   Change looks good. Lets ensure one of the UTs or by some local testing, we 
ensure the sorting based on Pair comparator, results in same behavior



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -18,69 +18,120 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;

Review Comment:
   can we not depend on this here in java code.There are other places in the 
code, but love to not proliferate if possible



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -18,69 +18,120 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
 
+import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Comparator;
+import java.util.function.Function;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
- * A partitioner that does sorting based on specified column values for each 
RDD partition.
+ * A partitioner that does local sorting for each RDD partition based on the 
tuple of
+ * values of the columns configured for ordering.
  *
  * @param <T> HoodieRecordPayload type
  */
 public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
-    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+    extends RepartitioningBulkInsertPartitionerBase<JavaRDD<HoodieRecord<T>>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;

Review Comment:
   why the rename



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionNoSortPartitioner.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
+
+/**
+ * A built-in partitioner that only does re-partitioning to better align 
"logical" partitioning

Review Comment:
   the side effect of this is skew. for e.g if the users trim down the total 
numbers of spark partitions or have only a few partition paths (table 
partitions), then N*M is actually okay. But if one of M table partitions is 
very large, then that spark partition is going to take a long time to finish 
writing. 
   
   We should add this to the comments here and also to the site docs.  Global 
sort handles this too, since sorting will evenly distribute data amongst 
executors.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java:
##########
@@ -27,16 +28,18 @@
  */
 public abstract class BulkInsertInternalPartitionerFactory {
 
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
-    switch (sortMode) {
+  public static BulkInsertPartitioner get(BulkInsertSortMode bulkInsertMode, 
HoodieTableConfig tableConfig) {

Review Comment:
   +1 on avoiding cosmetic changes. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -78,12 +78,12 @@ object HoodieSparkSqlWriter {
     SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
-    val path = optParams("path")
-    val basePath = new Path(path)
+    val basePathStr = optParams("path")

Review Comment:
   lets avoid these changes, however small



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -138,4 +140,16 @@ trait SparkAdapter extends Serializable {
    * TODO move to HoodieCatalystExpressionUtils
    */
   def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+  /**
+   * Insert all records, updates related task metrics, and return a completion 
iterator
+   * over all the data written to this [[ExternalSorter]], aggregated by our 
aggregator.
+   *
+   * On task completion (success, failure, or cancellation), it releases 
resources by
+   * calling `stop()`.
+   *
+   * NOTE: This method is an [[ExternalSorter#insertAllAndUpdateMetrics]] 
back-ported to Spark 2.4
+   */
+  def insertInto[K, V, C](ctx: TaskContext, records: Iterator[Product2[K, V]], 
sorter: ExternalSorter[K, V, C]): Iterator[Product2[K, C]]

Review Comment:
   Can you explain why this change is needed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java:
##########
@@ -19,19 +19,39 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 /**
- * A built-in partitioner that does local sorting for each spark partitions 
after coalesce for bulk insert operation, corresponding to the {@code 
BulkInsertSortMode.PARTITION_SORT} mode.
+ * A built-in partitioner that does local sorting w/in the Spark partition,
+ * corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode.
  */
-public class PartitionSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
+public class PartitionSortPartitionerWithRows extends 
RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
+
+  public PartitionSortPartitionerWithRows(HoodieTableConfig tableConfig) {
+    super(tableConfig);
+  }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
-    return 
rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
 HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int 
outputSparkPartitions) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: Datasets being ingested into partitioned tables are additionally 
re-partitioned to better
+    //       align dataset's logical partitioning with expected table's 
physical partitioning to
+    //       provide for appropriate file-sizing and better control of the 
number of files created.
+    //
+    //       Please check out {@code GlobalSortPartitioner} java-doc for more 
details
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));

Review Comment:
   We are calling `repartition` here, followed by `sortWithinPartitons`.Won't 
this shuffle two times?. Lets get on the same page w.r.t this.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,42 +19,69 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each 
spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the 
tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements 
BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends 
RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
+
+  private final String[] orderByColumnNames;
 
-  private final String[] sortColumnNames;
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, 
HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, 
HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.coalesce(outputSparkPartitions)
-        .sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
sortColumns);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int 
outputSparkPartitions) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all 
RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in 
doing global sorting
+    //       across "physical" partitions, and instead we can reduce total 
amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" 
partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = dataset.coalesce(outputSparkPartitions);
+    }
+
+    return repartitionedDataset.sortWithinPartitions(

Review Comment:
   same q. `sortWithinPartitions` does not shuffle, but just does the external 
spill sorting?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java:
##########
@@ -27,16 +28,18 @@
  */
 public abstract class BulkInsertInternalPartitionerFactory {
 
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
-    switch (sortMode) {
+  public static BulkInsertPartitioner get(BulkInsertSortMode bulkInsertMode, 
HoodieTableConfig tableConfig) {

Review Comment:
   I think this PR actually adds the first mode that does not sort. (except 
NONE, which is easy to understand anyway). So if anything we should fix this in 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to