yihua commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1569730199
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -71,24 +64,36 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
isIncremental: Boolean,
shouldUseRecordPosition:
Boolean,
requiredFilters: Seq[Filter]
- ) extends ParquetFileFormat with
SparkAdapterSupport with HoodieFormatTrait {
+ ) extends ParquetFileFormat
with SparkAdapterSupport with HoodieFormatTrait {
def getRequiredFilters: Seq[Filter] = requiredFilters
+ private val sanitizedTableName =
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
+
/**
* Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
* while the other side can't
*/
- private var supportBatchCalled = false
- private var supportBatchResult = false
+ /*
+private var supportBatchCalled = false
+private var supportBatchResult = false
+
+override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ if (!supportBatchCalled || supportBatchResult) {
+ supportBatchCalled = true
+ supportBatchResult = tableSchema.internalSchema.isEmpty && !isMOR &&
!isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema)
+ }
+ supportBatchResult
+}
+ */
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = false
Review Comment:
Are these still needed? If not, remove the lines?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -80,16 +79,17 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
tempDir.toAbsolutePath.toUri.toString
}
- override def getHoodieReaderContext(tablePath: String, avroSchema: Schema):
HoodieReaderContext[InternalRow] = {
- val parquetFileFormat = new ParquetFileFormat
- val structTypeSchema =
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-
- val recordReaderIterator =
parquetFileFormat.buildReaderWithPartitionValues(
- spark, structTypeSchema, StructType(Seq.empty), structTypeSchema,
Seq.empty, Map.empty, getHadoopConf)
-
- val m = scala.collection.mutable.Map[Long, PartitionedFile =>
Iterator[InternalRow]]()
- m.put(2*avroSchema.hashCode(), recordReaderIterator)
- new SparkFileFormatInternalRowReaderContext(m)
+ override def getHoodieReaderContext(tablePath: String, avroSchema: Schema,
hadoopConf: Configuration): HoodieReaderContext[InternalRow] = {
+ val reader = sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, Map.empty, hadoopConf)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(getHadoopConf).setBasePath(tablePath).build
+ val recordKeyField = if (metaClient.getTableConfig.populateMetaFields()) {
+ HoodieRecord.RECORD_KEY_METADATA_FIELD
+ } else {
+ val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+ checkState(keyFields.length == 1)
+ keyFields.head
+ }
Review Comment:
Could this be wrapped into a util method? I think virtual key support may
have already added such method.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -107,19 +112,23 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val dataSchema =
StructType(tableSchema.structTypeSchema.fields.filterNot(f =>
partitionColumns.contains(f.name)))
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
spark.conf.set("spark.sql.parquet.enableVectorizedReader",
supportBatchResult)
- val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
- val isCount = requiredSchemaWithMandatory.isEmpty
- val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
- val requiredMeta = StructType(requiredSchemaSplits._1)
- val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+ val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
- val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) =
buildFileReaders(
- spark, dataSchema, partitionSchema, requiredSchema, filters, options,
augmentedHadoopConf,
- requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+ setSchemaEvolutionConfigs(augmentedHadoopConf, options)
+ val baseFileReader = super.buildReaderWithPartitionValues(spark,
dataSchema, partitionSchema, requiredSchema,
+ filters ++ requiredFilters, options, new
Configuration(augmentedHadoopConf))
+ val cdcFileReader = super.buildReaderWithPartitionValues(
+ spark,
+ tableSchema.structTypeSchema,
+ StructType(Nil),
+ tableSchema.structTypeSchema,
+ Nil,
+ options,
+ new Configuration(hadoopConf))
val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
val dataAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName)
-
+ val parquetFileReader =
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
spark.sessionState.conf, options, augmentedHadoopConf))
Review Comment:
Should the parquet reader be instantiated on the executor side inside
`(file: PartitionedFile) => { .. }` ?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -129,20 +138,15 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
file.partitionValues match {
// Snapshot or incremental queries.
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
- val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
- val filegroupName = if (FSUtils.isLogFile(filePath)) {
- FSUtils.getFileId(filePath.getName).substring(1)
- } else {
- FSUtils.getFileId(filePath.getName)
- }
+ val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
+ .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
case Some(fileSlice) if !isCount =>
if (requiredSchema.isEmpty &&
!fileSlice.getLogFiles.findAny().isPresent) {
val hoodieBaseFile = fileSlice.getBaseFile.get()
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
} else {
- val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(
- readerMaps)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters)
Review Comment:
I remember the original intention is to remove any arguments of the reader
context (the reader instance is instantiated inside the
`context.getFileRecordIterator`) and there should be no Hudi related
parameters. Any parameters can be passed by the file group reader as an
argument to the context function call.
##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -34,63 +32,17 @@ class TestHoodieFileGroupReaderBasedParquetFileFormat
extends SparkClientFunctio
IsNotNull("non_key_column"),
EqualTo("non_key_column", 1)
)
- val filtersWithoutKeyColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+ val filtersWithoutKeyColumn =
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
filters, "key_column");
assertEquals(0, filtersWithoutKeyColumn.size)
val filtersWithKeys = Seq(
EqualTo("key_column", 1),
GreaterThan("non_key_column", 2)
)
- val filtersWithKeyColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+ val filtersWithKeyColumn =
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
filtersWithKeys, "key_column")
assertEquals(1, filtersWithKeyColumn.size)
assertEquals("key_column", filtersWithKeyColumn.head.references.head)
}
-
- @Test
- def testGetAppliedRequiredSchema(): Unit = {
- val fields = Array(
- StructField("column_a", LongType, nullable = false),
- StructField("column_b", StringType, nullable = false))
- val requiredSchema = StructType(fields)
-
- val appliedSchema: StructType =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
- requiredSchema, shouldUseRecordPosition = true, "row_index")
- if (HoodieSparkUtils.gteqSpark3_5) {
- assertEquals(3, appliedSchema.fields.length)
- } else {
- assertEquals(2, appliedSchema.fields.length)
- }
-
- val schemaWithoutRowIndexColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
- requiredSchema, shouldUseRecordPosition = false, "row_index")
- assertEquals(2, schemaWithoutRowIndexColumn.fields.length)
- }
-
- @Test
- def testGetAppliedFilters(): Unit = {
- val filters = Seq(
Review Comment:
Are these test cases move to somewhere else?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]