yihua commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1569730199


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -71,24 +64,36 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                                   isIncremental: Boolean,
                                                   shouldUseRecordPosition: 
Boolean,
                                                   requiredFilters: Seq[Filter]
-                                           ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
+                                                 ) extends ParquetFileFormat 
with SparkAdapterSupport with HoodieFormatTrait {
 
   def getRequiredFilters: Seq[Filter] = requiredFilters
 
+  private val sanitizedTableName = 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
+
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
    * while the other side can't
    */
-  private var supportBatchCalled = false
-  private var supportBatchResult = false
+  /*
+private var supportBatchCalled = false
+private var supportBatchResult = false
+
+override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+  if (!supportBatchCalled || supportBatchResult) {
+    supportBatchCalled = true
+    supportBatchResult = tableSchema.internalSchema.isEmpty && !isMOR && 
!isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema)
+  }
+  supportBatchResult
+}
+ */
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = false

Review Comment:
   Are these still needed? If not, remove the lines?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -80,16 +79,17 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     tempDir.toAbsolutePath.toUri.toString
   }
 
-  override def getHoodieReaderContext(tablePath: String, avroSchema: Schema): 
HoodieReaderContext[InternalRow] = {
-    val parquetFileFormat = new ParquetFileFormat
-    val structTypeSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-
-    val recordReaderIterator = 
parquetFileFormat.buildReaderWithPartitionValues(
-      spark, structTypeSchema, StructType(Seq.empty), structTypeSchema, 
Seq.empty, Map.empty, getHadoopConf)
-
-    val m = scala.collection.mutable.Map[Long, PartitionedFile => 
Iterator[InternalRow]]()
-    m.put(2*avroSchema.hashCode(), recordReaderIterator)
-    new SparkFileFormatInternalRowReaderContext(m)
+  override def getHoodieReaderContext(tablePath: String, avroSchema: Schema, 
hadoopConf: Configuration): HoodieReaderContext[InternalRow] = {
+    val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, hadoopConf)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(getHadoopConf).setBasePath(tablePath).build
+    val recordKeyField = if (metaClient.getTableConfig.populateMetaFields()) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }

Review Comment:
   Could this be wrapped into a util method?  I think virtual key support may 
have already added such method.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -107,19 +112,23 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val dataSchema = 
StructType(tableSchema.structTypeSchema.fields.filterNot(f => 
partitionColumns.contains(f.name)))
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     spark.conf.set("spark.sql.parquet.enableVectorizedReader", 
supportBatchResult)
-    val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
-    val isCount = requiredSchemaWithMandatory.isEmpty
-    val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
-    val requiredMeta = StructType(requiredSchemaSplits._1)
-    val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+    val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
-    val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) = 
buildFileReaders(
-      spark, dataSchema, partitionSchema, requiredSchema, filters, options, 
augmentedHadoopConf,
-      requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+    setSchemaEvolutionConfigs(augmentedHadoopConf, options)
+    val baseFileReader = super.buildReaderWithPartitionValues(spark, 
dataSchema, partitionSchema, requiredSchema,
+      filters ++ requiredFilters, options, new 
Configuration(augmentedHadoopConf))
+    val cdcFileReader = super.buildReaderWithPartitionValues(
+      spark,
+      tableSchema.structTypeSchema,
+      StructType(Nil),
+      tableSchema.structTypeSchema,
+      Nil,
+      options,
+      new Configuration(hadoopConf))
 
     val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
     val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
-
+    val parquetFileReader = 
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
 spark.sessionState.conf, options, augmentedHadoopConf))

Review Comment:
   Should the parquet reader be instantiated on the executor side inside 
`(file: PartitionedFile) => { .. }` ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -129,20 +138,15 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       file.partitionValues match {
         // Snapshot or incremental queries.
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
-          val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
-          val filegroupName = if (FSUtils.isLogFile(filePath)) {
-            FSUtils.getFileId(filePath.getName).substring(1)
-          } else {
-            FSUtils.getFileId(filePath.getName)
-          }
+          val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
+            .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
           fileSliceMapping.getSlice(filegroupName) match {
             case Some(fileSlice) if !isCount =>
               if (requiredSchema.isEmpty && 
!fileSlice.getLogFiles.findAny().isPresent) {
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
               } else {
-                val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(
-                  readerMaps)
+                val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters)

Review Comment:
   I remember the original intention is to remove any arguments of the reader 
context (the reader instance is instantiated inside the 
`context.getFileRecordIterator`) and there should be no Hudi related 
parameters.  Any parameters can be passed by the file group reader as an 
argument to the context function call.



##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -34,63 +32,17 @@ class TestHoodieFileGroupReaderBasedParquetFileFormat 
extends SparkClientFunctio
       IsNotNull("non_key_column"),
       EqualTo("non_key_column", 1)
     )
-    val filtersWithoutKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+    val filtersWithoutKeyColumn = 
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
       filters, "key_column");
     assertEquals(0, filtersWithoutKeyColumn.size)
 
     val filtersWithKeys = Seq(
       EqualTo("key_column", 1),
       GreaterThan("non_key_column", 2)
     )
-    val filtersWithKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+    val filtersWithKeyColumn = 
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
       filtersWithKeys, "key_column")
     assertEquals(1, filtersWithKeyColumn.size)
     assertEquals("key_column", filtersWithKeyColumn.head.references.head)
   }
-
-  @Test
-  def testGetAppliedRequiredSchema(): Unit = {
-    val fields = Array(
-      StructField("column_a", LongType, nullable = false),
-      StructField("column_b", StringType, nullable = false))
-    val requiredSchema = StructType(fields)
-
-    val appliedSchema: StructType = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
-      requiredSchema, shouldUseRecordPosition = true, "row_index")
-    if (HoodieSparkUtils.gteqSpark3_5) {
-      assertEquals(3, appliedSchema.fields.length)
-    } else {
-      assertEquals(2, appliedSchema.fields.length)
-    }
-
-    val schemaWithoutRowIndexColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
-      requiredSchema, shouldUseRecordPosition = false, "row_index")
-    assertEquals(2, schemaWithoutRowIndexColumn.fields.length)
-  }
-
-  @Test
-  def testGetAppliedFilters(): Unit = {
-    val filters = Seq(

Review Comment:
   Are these test cases move to somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to