This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b27b1f688aad236598c546c55062b4f69d973ad0 Author: Jon Vexler <[email protected]> AuthorDate: Fri Aug 11 02:50:10 2023 -0700 [HUDI-6663] New Parquet File Format remove broadcast to fix performance issue for complex file slices (#9409) --- .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 10 +++++----- .../org/apache/hudi/NewHoodieParquetFileFormatUtils.scala | 2 +- .../main/scala/org/apache/hudi/PartitionFileSliceMapping.scala | 7 +++---- .../datasources/parquet/NewHoodieParquetFileFormat.scala | 8 ++++---- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 1193b75bfdf..8a7c06b1d15 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -104,7 +104,7 @@ case class HoodieFileIndex(spark: SparkSession, override def rootPaths: Seq[Path] = getQueryPaths.asScala - var shouldBroadcast: Boolean = false + var shouldEmbedFileSlices: Boolean = false /** * Returns the FileStatus for all the base files (excluding log files). This should be used only for @@ -148,7 +148,7 @@ case class HoodieFileIndex(spark: SparkSession, override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map { case (partitionOpt, fileSlices) => - if (shouldBroadcast) { + if (shouldEmbedFileSlices) { val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = fileSlices.map(slice => { if (slice.getBaseFile.isPresent) { slice.getBaseFile.get().getFileStatus @@ -162,7 +162,7 @@ case class HoodieFileIndex(spark: SparkSession, || (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)). foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) } if (c.nonEmpty) { - PartitionDirectory(new PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly) + PartitionDirectory(new PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), baseFileStatusesAndLogFileOnly) } else { PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly) } @@ -187,7 +187,7 @@ case class HoodieFileIndex(spark: SparkSession, if (shouldReadAsPartitionedTable()) { prunedPartitionsAndFilteredFileSlices - } else if (shouldBroadcast) { + } else if (shouldEmbedFileSlices) { assert(partitionSchema.isEmpty) prunedPartitionsAndFilteredFileSlices }else { @@ -274,7 +274,7 @@ case class HoodieFileIndex(spark: SparkSession, // Prune the partition path by the partition filters // NOTE: Non-partitioned tables are assumed to consist from a single partition // encompassing the whole table - val prunedPartitions = if (shouldBroadcast) { + val prunedPartitions = if (shouldEmbedFileSlices) { listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) } else { listMatchingPartitionPaths(partitionFilters) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala index 5dd85c973b6..34214be1bd2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala @@ -198,7 +198,7 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext, } else { Seq.empty } - fileIndex.shouldBroadcast = true + fileIndex.shouldEmbedFileSlices = true HadoopFsRelation( location = fileIndex, partitionSchema = fileIndex.partitionSchema, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala index c9468e2d601..1e639f0daab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala @@ -20,17 +20,16 @@ package org.apache.hudi import org.apache.hudi.common.model.FileSlice -import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types.{DataType, Decimal} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class PartitionFileSliceMapping(internalRow: InternalRow, - broadcast: Broadcast[Map[String, FileSlice]]) extends InternalRow { + slices: Map[String, FileSlice]) extends InternalRow { def getSlice(fileId: String): Option[FileSlice] = { - broadcast.value.get(fileId) + slices.get(fileId) } def getInternalRow: InternalRow = internalRow @@ -41,7 +40,7 @@ class PartitionFileSliceMapping(internalRow: InternalRow, override def update(i: Int, value: Any): Unit = internalRow.update(i, value) - override def copy(): InternalRow = new PartitionFileSliceMapping(internalRow.copy(), broadcast) + override def copy(): InternalRow = new PartitionFileSliceMapping(internalRow.copy(), slices) override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala index 0c1c3c8e5ee..a8ba96b9b71 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala @@ -120,22 +120,22 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { file.partitionValues match { - case broadcast: PartitionFileSliceMapping => + case fileSliceMapping: PartitionFileSliceMapping => val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) if (FSUtils.isLogFile(filePath)) { //no base file - val fileSlice = broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get + val fileSlice = fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get val logFiles = getLogFilesFromSlice(fileSlice) val outputAvroSchema = HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName) new LogFileIterator(logFiles, filePath.getParent, tableSchema.value, outputSchema, outputAvroSchema, tableState.value, broadcastedHadoopConf.value.value) } else { //We do not broadcast the slice if it has no log files or bootstrap base - broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match { + fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) match { case Some(fileSlice) => val hoodieBaseFile = fileSlice.getBaseFile.get() val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile - val partitionValues = broadcast.getInternalRow + val partitionValues = fileSliceMapping.getInternalRow val logFiles = getLogFilesFromSlice(fileSlice) if (requiredSchemaWithMandatory.isEmpty) { val baseFile = createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
