xushiyan commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094187967
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -171,36 +172,30 @@ object HoodieDatasetBulkInsertHelper
table.getContext.parallelize(writeStatuses.toList.asJava)
}
- private def dedupeRows(rdd: RDD[InternalRow], schema: StructType,
preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+ private def dedupRows(rdd: RDD[InternalRow], schema: StructType,
preCombineFieldRef: String, isPartitioned: Boolean): RDD[InternalRow] = {
val recordKeyMetaFieldOrd =
schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
val partitionPathMetaFieldOrd =
schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
// NOTE: Pre-combine field could be a nested field
val preCombineFieldPath = composeNestedFieldPath(schema,
preCombineFieldRef)
.getOrElse(throw new HoodieException(s"Pre-combine field
$preCombineFieldRef is missing in $schema"))
rdd.map { row =>
- val rowKey = if (isGlobalIndex) {
- row.getString(recordKeyMetaFieldOrd)
+ val partitionPath = if (isPartitioned)
row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8
+ val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)
+
+ ((partitionPath, recordKey), row)
Review Comment:
not copying the `row` here?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
package org.apache.spark.sql
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.types._
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
object HoodieDataTypeUtils {
+ /**
+ * Checks whether provided schema contains Hudi's meta-fields
+ *
+ * NOTE: This method validates presence of just one field
[[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+ * however assuming that meta-fields should either be omitted or specified
in full
+ */
+ def hasMetaFields(structType: StructType): Boolean =
+ structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+ // TODO scala-doc
Review Comment:
resolve TODO
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
package org.apache.spark.sql
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.types._
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
object HoodieDataTypeUtils {
+ /**
+ * Checks whether provided schema contains Hudi's meta-fields
+ *
+ * NOTE: This method validates presence of just one field
[[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+ * however assuming that meta-fields should either be omitted or specified
in full
+ */
+ def hasMetaFields(structType: StructType): Boolean =
+ structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+ // TODO scala-doc
+ def addMetaFields(schema: StructType): StructType = {
Review Comment:
this is more like ensuring meta fields placed first in schema. so the name
can be more accurate.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +215,39 @@ object HoodieDatasetBulkInsertHelper
val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
keyGenerator.getPartitionPathFields.asScala
}
+
+ /**
+ * We use custom Spark [[Partitioner]] that is aware of the target table's
partitioning
+ * so that during inevitable shuffling required for de-duplication, we also
assign records
+ * into individual Spark partitions in a way affine with target table's
physical partitioning
+ * (ie records from the same table's partition will be co-located in the
same Spark's partition)
+ *
+ * This would allow us to
+ * <ul>
+ * <li>Save on additional shuffling subsequently (by
[[BulkInsertPartitioner]])</li>
+ * <li>Avoid "small files explosion" entailed by random (hash)
partitioning stemming
+ * from the fact that every Spark partition hosts records from many
table's partitions
+ * resulting into every Spark task writing into their own files in these
partitions (in
+ * case no subsequent re-partitioning is performed)
+ * </li>
+ * <ul>
+ *
+ * For more details check out HUDI-5685
+ */
+ private case class TablePartitioningAwarePartitioner(override val
numPartitions: Int,
+ val isPartitioned:
Boolean) extends Partitioner {
Review Comment:
we don't need additional flag to tell partitioned or not. can just check if
nonEmpty(partitionPath) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]