This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f4832d9903f [HUDI-6663] New Parquet File Format remove broadcast to
fix performance issue for complex file slices (#9409)
f4832d9903f is described below
commit f4832d9903fb7c59891ae501265496c961a3ec41
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 11 02:50:10 2023 -0700
[HUDI-6663] New Parquet File Format remove broadcast to fix performance
issue for complex file slices (#9409)
---
.../src/main/scala/org/apache/hudi/HoodieFileIndex.scala | 10 +++++-----
.../org/apache/hudi/NewHoodieParquetFileFormatUtils.scala | 2 +-
.../main/scala/org/apache/hudi/PartitionFileSliceMapping.scala | 7 +++----
.../datasources/parquet/NewHoodieParquetFileFormat.scala | 8 ++++----
4 files changed, 13 insertions(+), 14 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 1193b75bfdf..8a7c06b1d15 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -104,7 +104,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def rootPaths: Seq[Path] = getQueryPaths.asScala
- var shouldBroadcast: Boolean = false
+ var shouldEmbedFileSlices: Boolean = false
/**
* Returns the FileStatus for all the base files (excluding log files). This
should be used only for
@@ -148,7 +148,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters,
partitionFilters).map {
case (partitionOpt, fileSlices) =>
- if (shouldBroadcast) {
+ if (shouldEmbedFileSlices) {
val baseFileStatusesAndLogFileOnly: Seq[FileStatus] =
fileSlices.map(slice => {
if (slice.getBaseFile.isPresent) {
slice.getBaseFile.get().getFileStatus
@@ -162,7 +162,7 @@ case class HoodieFileIndex(spark: SparkSession,
|| (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
if (c.nonEmpty) {
- PartitionDirectory(new
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values),
spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly)
+ PartitionDirectory(new
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c),
baseFileStatusesAndLogFileOnly)
} else {
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values),
baseFileStatusesAndLogFileOnly)
}
@@ -187,7 +187,7 @@ case class HoodieFileIndex(spark: SparkSession,
if (shouldReadAsPartitionedTable()) {
prunedPartitionsAndFilteredFileSlices
- } else if (shouldBroadcast) {
+ } else if (shouldEmbedFileSlices) {
assert(partitionSchema.isEmpty)
prunedPartitionsAndFilteredFileSlices
}else {
@@ -274,7 +274,7 @@ case class HoodieFileIndex(spark: SparkSession,
// Prune the partition path by the partition filters
// NOTE: Non-partitioned tables are assumed to consist from a single
partition
// encompassing the whole table
- val prunedPartitions = if (shouldBroadcast) {
+ val prunedPartitions = if (shouldEmbedFileSlices) {
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
} else {
listMatchingPartitionPaths(partitionFilters)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
index 5dd85c973b6..34214be1bd2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -198,7 +198,7 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
} else {
Seq.empty
}
- fileIndex.shouldBroadcast = true
+ fileIndex.shouldEmbedFileSlices = true
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
index c9468e2d601..1e639f0daab 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
@@ -20,17 +20,16 @@
package org.apache.hudi
import org.apache.hudi.common.model.FileSlice
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class PartitionFileSliceMapping(internalRow: InternalRow,
- broadcast: Broadcast[Map[String, FileSlice]])
extends InternalRow {
+ slices: Map[String, FileSlice]) extends
InternalRow {
def getSlice(fileId: String): Option[FileSlice] = {
- broadcast.value.get(fileId)
+ slices.get(fileId)
}
def getInternalRow: InternalRow = internalRow
@@ -41,7 +40,7 @@ class PartitionFileSliceMapping(internalRow: InternalRow,
override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
- override def copy(): InternalRow = new
PartitionFileSliceMapping(internalRow.copy(), broadcast)
+ override def copy(): InternalRow = new
PartitionFileSliceMapping(internalRow.copy(), slices)
override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 0c1c3c8e5ee..a8ba96b9b71 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -120,22 +120,22 @@ class NewHoodieParquetFileFormat(tableState:
Broadcast[HoodieTableState],
val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
file.partitionValues match {
- case broadcast: PartitionFileSliceMapping =>
+ case fileSliceMapping: PartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
if (FSUtils.isLogFile(filePath)) {
//no base file
- val fileSlice =
broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+ val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
val logFiles = getLogFilesFromSlice(fileSlice)
val outputAvroSchema =
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
new LogFileIterator(logFiles, filePath.getParent,
tableSchema.value, outputSchema, outputAvroSchema,
tableState.value, broadcastedHadoopConf.value.value)
} else {
//We do not broadcast the slice if it has no log files or
bootstrap base
- broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match {
+ fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
case Some(fileSlice) =>
val hoodieBaseFile = fileSlice.getBaseFile.get()
val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
- val partitionValues = broadcast.getInternalRow
+ val partitionValues = fileSliceMapping.getInternalRow
val logFiles = getLogFilesFromSlice(fileSlice)
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)