This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b27b1f688aad236598c546c55062b4f69d973ad0
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 11 02:50:10 2023 -0700

    [HUDI-6663] New Parquet File Format remove broadcast to fix performance 
issue for complex file slices (#9409)
---
 .../src/main/scala/org/apache/hudi/HoodieFileIndex.scala       | 10 +++++-----
 .../org/apache/hudi/NewHoodieParquetFileFormatUtils.scala      |  2 +-
 .../main/scala/org/apache/hudi/PartitionFileSliceMapping.scala |  7 +++----
 .../datasources/parquet/NewHoodieParquetFileFormat.scala       |  8 ++++----
 4 files changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 1193b75bfdf..8a7c06b1d15 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -104,7 +104,7 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def rootPaths: Seq[Path] = getQueryPaths.asScala
 
-  var shouldBroadcast: Boolean = false
+  var shouldEmbedFileSlices: Boolean = false
 
   /**
    * Returns the FileStatus for all the base files (excluding log files). This 
should be used only for
@@ -148,7 +148,7 @@ case class HoodieFileIndex(spark: SparkSession,
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
     val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, 
partitionFilters).map {
       case (partitionOpt, fileSlices) =>
-        if (shouldBroadcast) {
+        if (shouldEmbedFileSlices) {
           val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
             if (slice.getBaseFile.isPresent) {
               slice.getBaseFile.get().getFileStatus
@@ -162,7 +162,7 @@ case class HoodieFileIndex(spark: SparkSession,
             || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
             foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
           if (c.nonEmpty) {
-            PartitionDirectory(new 
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), 
spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly)
+            PartitionDirectory(new 
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), 
baseFileStatusesAndLogFileOnly)
           } else {
             PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), 
baseFileStatusesAndLogFileOnly)
           }
@@ -187,7 +187,7 @@ case class HoodieFileIndex(spark: SparkSession,
 
     if (shouldReadAsPartitionedTable()) {
       prunedPartitionsAndFilteredFileSlices
-    } else if (shouldBroadcast) {
+    } else if (shouldEmbedFileSlices) {
       assert(partitionSchema.isEmpty)
       prunedPartitionsAndFilteredFileSlices
     }else {
@@ -274,7 +274,7 @@ case class HoodieFileIndex(spark: SparkSession,
     // Prune the partition path by the partition filters
     // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
     //       encompassing the whole table
-    val prunedPartitions = if (shouldBroadcast) {
+    val prunedPartitions = if (shouldEmbedFileSlices) {
       
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters))
     } else {
       listMatchingPartitionPaths(partitionFilters)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
index 5dd85c973b6..34214be1bd2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -198,7 +198,7 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: 
SQLContext,
     } else {
       Seq.empty
     }
-    fileIndex.shouldBroadcast = true
+    fileIndex.shouldEmbedFileSlices = true
     HadoopFsRelation(
       location = fileIndex,
       partitionSchema = fileIndex.partitionSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
index c9468e2d601..1e639f0daab 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
@@ -20,17 +20,16 @@
 package org.apache.hudi
 
 import org.apache.hudi.common.model.FileSlice
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types.{DataType, Decimal}
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class PartitionFileSliceMapping(internalRow: InternalRow,
-                                broadcast: Broadcast[Map[String, FileSlice]]) 
extends InternalRow {
+                                slices: Map[String, FileSlice]) extends 
InternalRow {
 
   def getSlice(fileId: String): Option[FileSlice] = {
-    broadcast.value.get(fileId)
+    slices.get(fileId)
   }
 
   def getInternalRow: InternalRow = internalRow
@@ -41,7 +40,7 @@ class PartitionFileSliceMapping(internalRow: InternalRow,
 
   override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
 
-  override def copy(): InternalRow = new 
PartitionFileSliceMapping(internalRow.copy(), broadcast)
+  override def copy(): InternalRow = new 
PartitionFileSliceMapping(internalRow.copy(), slices)
 
   override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 0c1c3c8e5ee..a8ba96b9b71 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -120,22 +120,22 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     (file: PartitionedFile) => {
       file.partitionValues match {
-        case broadcast: PartitionFileSliceMapping =>
+        case fileSliceMapping: PartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
           if (FSUtils.isLogFile(filePath)) {
             //no base file
-            val fileSlice = 
broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
             val logFiles = getLogFilesFromSlice(fileSlice)
             val outputAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
             new LogFileIterator(logFiles, filePath.getParent, 
tableSchema.value, outputSchema, outputAvroSchema,
               tableState.value, broadcastedHadoopConf.value.value)
           } else {
             //We do not broadcast the slice if it has no log files or 
bootstrap base
-            broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match {
+            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
               case Some(fileSlice) =>
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = broadcast.getInternalRow
+                val partitionValues = fileSliceMapping.getInternalRow
                 val logFiles = getLogFilesFromSlice(fileSlice)
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)

Reply via email to