alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094809321
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -171,36 +172,30 @@ object HoodieDatasetBulkInsertHelper
table.getContext.parallelize(writeStatuses.toList.asJava)
}
- private def dedupeRows(rdd: RDD[InternalRow], schema: StructType,
preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+ private def dedupRows(rdd: RDD[InternalRow], schema: StructType,
preCombineFieldRef: String, isPartitioned: Boolean): RDD[InternalRow] = {
val recordKeyMetaFieldOrd =
schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
val partitionPathMetaFieldOrd =
schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
// NOTE: Pre-combine field could be a nested field
val preCombineFieldPath = composeNestedFieldPath(schema,
preCombineFieldRef)
.getOrElse(throw new HoodieException(s"Pre-combine field
$preCombineFieldRef is missing in $schema"))
rdd.map { row =>
- val rowKey = if (isGlobalIndex) {
- row.getString(recordKeyMetaFieldOrd)
+ val partitionPath = if (isPartitioned)
row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8
+ val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)
+
+ ((partitionPath, recordKey), row)
Review Comment:
Not needed anymore (we're doing subsequent shuffling which will do the
"copying" for us)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]