This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d9822b8dd03 [HUDI-9302] Enable vectorized reading for file slice
without log file (#13127)
d9822b8dd03 is described below
commit d9822b8dd03d173a9cae5f0cdebfd36bb732748f
Author: TheR1sing3un <[email protected]>
AuthorDate: Sat May 24 17:09:45 2025 +0800
[HUDI-9302] Enable vectorized reading for file slice without log file
(#13127)
* enable vectorized reading when only base file is in the file group.
---------
Signed-off-by: TheR1sing3un <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
...odieFileGroupReaderBasedParquetFileFormat.scala | 47 ++++++++++++++++------
.../hudi/TestAvroSchemaResolutionSupport.scala | 9 +----
.../datasources/parquet/Spark33ParquetReader.scala | 13 +++++-
3 files changed, 48 insertions(+), 21 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index afab0f391bc..47c5dafa872 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -82,18 +82,33 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
private val sanitizedTableName =
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
/**
- * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
- * while the other side can't
+ * Flag saying whether vectorized reading is supported.
*/
- private var supportBatchCalled = false
- private var supportBatchResult = false
+ private var supportVectorizedRead = false
+ /**
+ * Flag saying whether batch output is supported.
+ */
+ private var supportReturningBatch = false
+
+ /**
+ * Checks if the file format supports vectorized reading, please refer to
SPARK-40918.
+ *
+ * NOTE: for mor read, even for file-slice with only base file, we can read
parquet file with vectorized read,
+ * but the return result of the whole data-source-scan phase cannot be batch,
+ * because when there are any log file in a file slice, it needs to be read
by the file group reader.
+ * Since we are currently performing merges based on rows, the result
returned by merging should be based on rows,
+ * we cannot assume that all file slices have only base files.
+ * So we need to set the batch result back to false.
+ *
+ */
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- if (!supportBatchCalled || supportBatchResult) {
- supportBatchCalled = true
- supportBatchResult = !isMOR && !isIncremental && !isBootstrap &&
super.supportBatch(sparkSession, schema)
- }
- supportBatchResult
+ val superSupportBatch = super.supportBatch(sparkSession, schema)
+ supportVectorizedRead = !isIncremental && !isBootstrap && superSupportBatch
+ supportReturningBatch = !isMOR && supportVectorizedRead
+ logInfo(s"supportReturningBatch: $supportReturningBatch,
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, "
+
+ s"isBootstrap: $isBootstrap, superSupportBatch: $superSupportBatch")
+ supportReturningBatch
}
//for partition columns that we read from the file, we don't want them to be
constant column vectors so we
@@ -155,8 +170,16 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
val requestedSchema = StructType(requiredSchema.fields ++
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName)
val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
- val parquetFileReader =
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
+ val parquetFileReader =
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportVectorizedRead,
spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
+ val fileGroupParquetFileReader = if (isMOR && supportVectorizedRead) {
+ // for file group reader to perform read, we always need to read the
record without vectorized reader because our merging is based on row level.
+ // TODO: please consider to support vectorized reader in file group
reader
+
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(vectorized =
false,
+ spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
+ } else {
+ parquetFileReader
+ }
val broadcastedStorageConf = spark.sparkContext.broadcast(new
SerializableConfiguration(augmentedStorageConf.unwrap()))
val fileIndexProps: TypedProperties =
HoodieFileIndex.getConfigProperties(spark, options, null)
@@ -174,7 +197,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty ||
fileSlice.getLogFiles.findAny().isPresent) =>
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tablePath).build
- val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters,
requiredFilters, storageConf, metaClient.getTableConfig)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(fileGroupParquetFileReader.value,
filters, requiredFilters, storageConf, metaClient.getTableConfig)
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(maxMemoryPerCompaction))
@@ -211,7 +234,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping:
HoodiePartitionCDCFileGroupMapping =>
- buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping,
parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
+ buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping,
fileGroupParquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
case _ =>
readBaseFile(file, parquetFileReader.value, requestedSchema,
remainingPartitionSchema, fixedPartitionIndexes,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 8eb4c0aee85..2929534ffa7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -872,14 +872,7 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
upsertData(df2, tempRecordPath, isCow)
// after implicit type change, read the table with vectorized read enabled
- //fg reader with mor does not support vectorized currently and will auto
read by row
- if (isCow || !useFileGroupReader) {
- assertThrows(classOf[SparkException]){
- withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
- readTable(tempRecordPath, useFileGroupReader)
- }
- }
- } else {
+ assertThrows(classOf[SparkException]){
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
readTable(tempRecordPath, useFileGroupReader)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
index a4d5adda191..1f49da7352f 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -252,6 +252,17 @@ object Spark33ParquetReader extends
SparkParquetReaderBuilder {
sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong",
"false").toBoolean
)
+ // Should always be set by FileSourceScanExec while creating this.
+ // Check conf before checking the option, to allow working around an issue
by changing conf.
+ val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+ options.get(FileFormat.OPTION_RETURNING_BATCH)
+ .getOrElse {
+ throw new IllegalArgumentException(
+ "OPTION_RETURNING_BATCH should always be set for
ParquetFileFormat. " +
+ "To workaround this issue, set
spark.sql.parquet.enableVectorizedReader=false.")
+ }
+ .equals("true")
+
val parquetOptions = new ParquetOptions(options, sqlConf)
new Spark33ParquetReader(enableVectorizedReader = vectorized,
datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
@@ -266,7 +277,7 @@ object Spark33ParquetReader extends
SparkParquetReaderBuilder {
timestampConversion = sqlConf.isParquetINT96TimestampConversion,
enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
capacity = sqlConf.parquetVectorizedReaderBatchSize,
- returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+ returningBatch = returningBatch,
enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
timeZoneId = Some(sqlConf.sessionLocalTimeZone))
}