jonvex commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1626130305
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -232,211 +248,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
}
- private def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
- dataSchema: StructType,
- partitionSchema:
StructType): StructType = {
- val metaFields = Seq(
- StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
- StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
- StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
- StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
- StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
-
- // Helper method to get the StructField for nested fields
- @tailrec
- def findNestedField(schema: StructType, fieldParts: Array[String]):
Option[StructField] = {
- fieldParts.toList match {
- case head :: Nil => schema.fields.find(_.name == head) // If it's the
last part, find and return the field
- case head :: tail => // If there are more parts, find the field and
its nested fields
- schema.fields.find(_.name == head) match {
- case Some(StructField(_, nested: StructType, _, _)) =>
findNestedField(nested, tail.toArray)
- case _ => None // The path is not valid
- }
- case _ => None // Empty path, should not happen if the input is correct
- }
- }
-
- def findMetaField(name: String): Option[StructField] = {
- metaFields.find(f => f.name == name)
- }
-
- val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
- for (field <- mandatoryFields) {
- if (requiredSchema.getFieldIndex(field).isEmpty) {
- // Support for nested fields
- val fieldParts = field.split("\\.")
- val fieldToAdd = findNestedField(dataSchema, fieldParts)
- .orElse(findNestedField(partitionSchema, fieldParts))
- .orElse(findMetaField(field))
- .getOrElse(throw new IllegalArgumentException(s"Field $field does
not exist in the table schema"))
- added.append(fieldToAdd)
- }
- }
- val addedFields = StructType(added.toArray)
- StructType(requiredSchema.toArray ++ addedFields.fields)
- }
-
- protected def buildFileReaders(sparkSession: SparkSession, dataSchema:
StructType, partitionSchema: StructType,
- requiredSchema: StructType, filters:
Seq[Filter], options: Map[String, String],
- hadoopConf: Configuration,
requiredSchemaWithMandatory: StructType,
- requiredWithoutMeta: StructType,
requiredMeta: StructType):
- (PartitionedFile => Iterator[InternalRow],
- PartitionedFile => Iterator[InternalRow],
- mutable.Map[Long, PartitionedFile => Iterator[InternalRow]],
- PartitionedFile => Iterator[InternalRow]) = {
-
- val m = scala.collection.mutable.Map[Long, PartitionedFile =>
Iterator[InternalRow]]()
-
- val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters,
tableState.recordKeyField)
- val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
- filters ++ requiredFilters, options, new Configuration(hadoopConf))
- m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
-
- // File reader for reading a Hoodie base file that needs to be merged with
log files
- // Add support for reading files using inline file system.
- val appliedRequiredSchema: StructType = getAppliedRequiredSchema(
- requiredSchemaWithMandatory, shouldUseRecordPosition,
ROW_INDEX_TEMPORARY_COLUMN_NAME)
- val appliedFilters = getAppliedFilters(
- requiredFilters, recordKeyRelatedFilters, shouldUseRecordPosition)
- val preMergeBaseFileReader = super.buildReaderWithPartitionValues(
- sparkSession,
- dataSchema,
- StructType(Nil),
- appliedRequiredSchema,
- appliedFilters,
- options,
- new Configuration(hadoopConf))
- m.put(generateKey(dataSchema, appliedRequiredSchema),
preMergeBaseFileReader)
-
- val cdcFileReader = super.buildReaderWithPartitionValues(
- sparkSession,
- tableSchema.structTypeSchema,
- StructType(Nil),
- tableSchema.structTypeSchema,
- Nil,
- options + (OPTION_RETURNING_BATCH -> super.supportBatch(sparkSession,
tableSchema.structTypeSchema).toString),
- new Configuration(hadoopConf))
-
- //Rules for appending partitions and filtering in the bootstrap readers:
- // 1. if it is mor, we don't want to filter data or append partitions
- // 2. if we need to merge the bootstrap base and skeleton files then we
cannot filter
- // 3. if we need to merge the bootstrap base and skeleton files then we
should never append partitions to the
- // skeleton reader
- val needMetaCols = requiredMeta.nonEmpty
- val needDataCols = requiredWithoutMeta.nonEmpty
-
- //file reader for bootstrap skeleton files
- if (needMetaCols && isBootstrap) {
- val key = generateKey(HoodieSparkUtils.getMetaSchema, requiredMeta)
- if (needDataCols || isMOR) {
- // no filter and no append
- m.put(key, super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
- requiredMeta, Seq.empty, options, new Configuration(hadoopConf)))
- } else {
- // filter
- m.put(key, super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
- requiredMeta, filters ++ requiredFilters, options, new
Configuration(hadoopConf)))
- }
-
- val requestedMeta = StructType(requiredSchema.fields.filter(sf =>
isMetaField(sf.name)))
- m.put(generateKey(HoodieSparkUtils.getMetaSchema, requestedMeta),
- super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), requestedMeta,
- Seq.empty, options, new Configuration(hadoopConf)))
- }
-
- //file reader for bootstrap base files
- if (needDataCols && isBootstrap) {
- val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf =>
isMetaField(sf.name)))
- val key = generateKey(dataSchemaWithoutMeta, requiredWithoutMeta)
- if (isMOR || needMetaCols) {
- m.put(key, super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf)))
- // no filter and no append
-
- } else {
- // filter
- m.put(key, super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
- filters ++ requiredFilters, options, new Configuration(hadoopConf)))
- }
-
- val requestedWithoutMeta = StructType(requiredSchema.fields.filterNot(sf
=> isMetaField(sf.name)))
- m.put(generateKey(dataSchemaWithoutMeta, requestedWithoutMeta),
- super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requestedWithoutMeta,
- Seq.empty, options, new Configuration(hadoopConf)))
- }
-
- (baseFileReader, preMergeBaseFileReader, m, cdcFileReader)
- }
-
- protected def generateKey(dataSchema: StructType, requestedSchema:
StructType): Long = {
- AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName).hashCode() +
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName).hashCode()
- }
-}
-
-object HoodieFileGroupReaderBasedParquetFileFormat {
- // From "ParquetFileFormat.scala": The names of the field for record
position.
- private val ROW_INDEX = "row_index"
- private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
-
- // From "namedExpressions.scala": Used to construct to record position field
metadata.
- private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY =
"__file_source_generated_metadata_col"
- private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
- private val METADATA_COL_ATTR_KEY = "__metadata_col"
-
- /**
- * A required option (since Spark 3.3.2) to pass to
buildReaderWithPartitionValues to return columnar batch output or not.
- * For ParquetFileFormat and OrcFileFormat, passing this option is required.
- * This should only be passed as true if it can actually be supported, which
can be checked
- * by calling supportBatch.
- */
- private val OPTION_RETURNING_BATCH = "returning_batch"
Review Comment:
We don't need to pass that.
https://github.com/apache/spark/blob/8b88f5ae10cc676a9778c186b12c691fa913088d/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L633
it's supplied by the caller
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]