yihua commented on code in PR #13572:
URL: https://github.com/apache/hudi/pull/13572#discussion_r2233592505
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -266,6 +271,24 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
}.asInstanceOf[ClosableIterator[InternalRow]]
}
}
+
+ override def getDataFileSchema(filePath: StoragePath, storage:
HoodieStorage): Schema = {
+ val configuration =
storageConfiguration.asInstanceOf[StorageConfiguration[Configuration]].unwrap()
+ if (configuration.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS) ==
null) {
+ configuration.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")
+ }
+ val path = HadoopFSUtils.convertToHadoopPath(filePath)
+ val readOptions = HadoopReadOptions.builder(configuration, path)
+ .withMetadataFilter(ParquetMetadataConverter.SKIP_ROW_GROUPS).build
+ val inputFile = HadoopInputFile.fromPath(path, configuration)
+ try {
+ val fileReader = ParquetFileReader.open(inputFile, readOptions)
Review Comment:
If the schema read is necessary, could we find a way to reuse the file
reader, i.e., avoid opening a file with a new reader for reading schema only?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -266,6 +271,24 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
}.asInstanceOf[ClosableIterator[InternalRow]]
}
}
+
+ override def getDataFileSchema(filePath: StoragePath, storage:
HoodieStorage): Schema = {
+ val configuration =
storageConfiguration.asInstanceOf[StorageConfiguration[Configuration]].unwrap()
+ if (configuration.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS) ==
null) {
+ configuration.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")
+ }
+ val path = HadoopFSUtils.convertToHadoopPath(filePath)
+ val readOptions = HadoopReadOptions.builder(configuration, path)
+ .withMetadataFilter(ParquetMetadataConverter.SKIP_ROW_GROUPS).build
+ val inputFile = HadoopInputFile.fromPath(path, configuration)
+ try {
+ val fileReader = ParquetFileReader.open(inputFile, readOptions)
+ try {
+ val footer = fileReader.getFooter
+ new
AvroSchemaConverter(configuration).convert(footer.getFileMetaData.getSchema)
Review Comment:
Reading schema from the file incurs additional FS calls. Is this how
current Spark integration does for schema evolution (if so, could you kindly
reference the code)? Could this be done by using table metadata, e.g.,
`TableSchemaResolver`, to get the table schema as of an instant time that
writes this file? If such a mechanism of efficiently fetching schema of a
file, we should see it should be added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]