nsivabalan commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094947575


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -62,23 +64,18 @@ object HoodieDatasetBulkInsertHelper
                            partitioner: BulkInsertPartitioner[Dataset[Row]],
                            shouldDropPartitionColumns: Boolean): Dataset[Row] 
= {
     val populateMetaFields = config.populateMetaFields()
-    val schema = df.schema
-
-    val metaFields = Seq(
-      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
 
-    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val schema = df.schema
+    val populatedSchema = addMetaFields(schema)
 
     val updatedDF = if (populateMetaFields) {
       val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
         "Key-generator class name is required")
-
-      val prependedRdd: RDD[InternalRow] =
-        df.queryExecution.toRdd.mapPartitions { iter =>
+      val sourceRdd = df.queryExecution.toRdd
+      val populatedRdd: RDD[InternalRow] = if (hasMetaFields(schema)) {

Review Comment:
   is this for clustering row writer code path ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field 
[[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified 
in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined

Review Comment:
   minor. should we check for partition path as well ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's 
partitioning
+   * so that during inevitable shuffling required for de-duplication, we also 
assign records
+   * into individual Spark partitions in a way affine with target table's 
physical partitioning
+   * (ie records from the same table's partition will be co-located in the 
same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by 
[[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) 
partitioning stemming
+   *   from the fact that every Spark partition hosts records from many 
table's partitions
+   *   resulting into every Spark task writing into their own files in these 
partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val 
numPartitions: Int) extends Partitioner {
+    override def getPartition(key: Any): Int = {
+      key match {
+        case null => 0
+        case (partitionPath, recordKey) =>

Review Comment:
   won't this result in data skews? if one of the hudi partition has lot of 
data, the respective spark partition will skew the total time for de-dup right? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's 
partitioning
+   * so that during inevitable shuffling required for de-duplication, we also 
assign records
+   * into individual Spark partitions in a way affine with target table's 
physical partitioning
+   * (ie records from the same table's partition will be co-located in the 
same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by 
[[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) 
partitioning stemming
+   *   from the fact that every Spark partition hosts records from many 
table's partitions
+   *   resulting into every Spark task writing into their own files in these 
partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val 
numPartitions: Int) extends Partitioner {
+    override def getPartition(key: Any): Int = {
+      key match {
+        case null => 0
+        case (partitionPath, recordKey) =>

Review Comment:
   this was one of the reason why we did not go w/ this to avoid data skews. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to