This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d9822b8dd03 [HUDI-9302] Enable vectorized reading for file slice 
without log file (#13127)
d9822b8dd03 is described below

commit d9822b8dd03d173a9cae5f0cdebfd36bb732748f
Author: TheR1sing3un <[email protected]>
AuthorDate: Sat May 24 17:09:45 2025 +0800

    [HUDI-9302] Enable vectorized reading for file slice without log file 
(#13127)
    
    * enable vectorized reading when only base file is in the file group.
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 47 ++++++++++++++++------
 .../hudi/TestAvroSchemaResolutionSupport.scala     |  9 +----
 .../datasources/parquet/Spark33ParquetReader.scala | 13 +++++-
 3 files changed, 48 insertions(+), 21 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index afab0f391bc..47c5dafa872 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -82,18 +82,33 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
   private val sanitizedTableName = 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
 
   /**
-   * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
-   * while the other side can't
+   * Flag saying whether vectorized reading is supported.
    */
-  private var supportBatchCalled = false
-  private var supportBatchResult = false
+  private var supportVectorizedRead = false
 
+  /**
+   * Flag saying whether batch output is supported.
+   */
+  private var supportReturningBatch = false
+
+  /**
+   * Checks if the file format supports vectorized reading, please refer to 
SPARK-40918.
+   *
+   * NOTE: for mor read, even for file-slice with only base file, we can read 
parquet file with vectorized read,
+   * but the return result of the whole data-source-scan phase cannot be batch,
+   * because when there are any log file in a file slice, it needs to be read 
by the file group reader.
+   * Since we are currently performing merges based on rows, the result 
returned by merging should be based on rows,
+   * we cannot assume that all file slices have only base files.
+   * So we need to set the batch result back to false.
+   *
+   */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if (!supportBatchCalled || supportBatchResult) {
-      supportBatchCalled = true
-      supportBatchResult = !isMOR && !isIncremental && !isBootstrap && 
super.supportBatch(sparkSession, schema)
-    }
-    supportBatchResult
+    val superSupportBatch = super.supportBatch(sparkSession, schema)
+    supportVectorizedRead = !isIncremental && !isBootstrap && superSupportBatch
+    supportReturningBatch = !isMOR && supportVectorizedRead
+    logInfo(s"supportReturningBatch: $supportReturningBatch, 
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " 
+
+      s"isBootstrap: $isBootstrap, superSupportBatch: $superSupportBatch")
+    supportReturningBatch
   }
 
   //for partition columns that we read from the file, we don't want them to be 
constant column vectors so we
@@ -155,8 +170,16 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
     val requestedSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
     val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName)
     val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
-    val parquetFileReader = 
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
+    val parquetFileReader = 
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportVectorizedRead,
       spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
+    val fileGroupParquetFileReader = if (isMOR && supportVectorizedRead) {
+      // for file group reader to perform read, we always need to read the 
record without vectorized reader because our merging is based on row level.
+      // TODO: please consider to support vectorized reader in file group 
reader
+      
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(vectorized = 
false,
+        spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
+    } else {
+      parquetFileReader
+    }
     val broadcastedStorageConf = spark.sparkContext.broadcast(new 
SerializableConfiguration(augmentedStorageConf.unwrap()))
     val fileIndexProps: TypedProperties = 
HoodieFileIndex.getConfigProperties(spark, options, null)
 
@@ -174,7 +197,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
             case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || 
fileSlice.getLogFiles.findAny().isPresent) =>
               val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
                 .builder().setConf(storageConf).setBasePath(tablePath).build
-              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, 
requiredFilters, storageConf, metaClient.getTableConfig)
+              val readerContext = new 
SparkFileFormatInternalRowReaderContext(fileGroupParquetFileReader.value, 
filters, requiredFilters, storageConf, metaClient.getTableConfig)
               val props = metaClient.getTableConfig.getProps
               options.foreach(kv => props.setProperty(kv._1, kv._2))
               props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction))
@@ -211,7 +234,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
           }
         // CDC queries.
         case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
-          buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, 
parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
+          buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, 
fileGroupParquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
 
         case _ =>
           readBaseFile(file, parquetFileReader.value, requestedSchema, 
remainingPartitionSchema, fixedPartitionIndexes,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 8eb4c0aee85..2929534ffa7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -872,14 +872,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     upsertData(df2, tempRecordPath, isCow)
 
     // after implicit type change, read the table with vectorized read enabled
-    //fg reader with mor does not support vectorized currently and will auto 
read by row
-    if (isCow || !useFileGroupReader) {
-      assertThrows(classOf[SparkException]){
-        withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
-          readTable(tempRecordPath, useFileGroupReader)
-        }
-      }
-    } else {
+    assertThrows(classOf[SparkException]){
       withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
         readTable(tempRecordPath, useFileGroupReader)
       }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
index a4d5adda191..1f49da7352f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -252,6 +252,17 @@ object Spark33ParquetReader extends 
SparkParquetReaderBuilder {
       sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", 
"false").toBoolean
     )
 
+    // Should always be set by FileSourceScanExec while creating this.
+    // Check conf before checking the option, to allow working around an issue 
by changing conf.
+    val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+      options.get(FileFormat.OPTION_RETURNING_BATCH)
+        .getOrElse {
+          throw new IllegalArgumentException(
+            "OPTION_RETURNING_BATCH should always be set for 
ParquetFileFormat. " +
+              "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false.")
+        }
+        .equals("true")
+
     val parquetOptions = new ParquetOptions(options, sqlConf)
     new Spark33ParquetReader(enableVectorizedReader = vectorized,
       datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
@@ -266,7 +277,7 @@ object Spark33ParquetReader extends 
SparkParquetReaderBuilder {
       timestampConversion = sqlConf.isParquetINT96TimestampConversion,
       enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
       capacity = sqlConf.parquetVectorizedReaderBatchSize,
-      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      returningBatch = returningBatch,
       enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
       timeZoneId = Some(sqlConf.sessionLocalTimeZone))
   }

Reply via email to