yihua opened a new pull request, #18003:
URL: https://github.com/apache/hudi/pull/18003

   ### Describe the issue this Pull Request addresses
   
   Fixes #18002
   
   Hudi Spark integration uses certain Spark APIs through Hudi's `SparkAdapter` 
framework.  Databricks Spark Runtime has incompatible APIs compared OSS Spark.  
Particularly, the following Databricks Spark classes cause Hudi's incremental 
query needing full scan (a particular case) to fail (which invokes 
`HoodieBaseRelation#listLatestFileSlices`).
   
   1. `FileStatusWithMetadata#fileStatus` does not work in Databricks Runtime
   In OSS Spark:
   ```
   case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: 
Map[String, Any] = Map.empty) {
     // Wrapper methods to improve source compatibility in code that still 
expects a [[FileStatus]].
     def getPath: Path = fileStatus.getPath
     def getLen: Long = fileStatus.getLen
     def getModificationTime: Long = fileStatus.getModificationTime
     def isDirectory: Boolean = fileStatus.isDirectory
   }
   ```
   In Databricks Spark, the type of `fileStatus` is changed from `FileStatus` 
to `SerializableFileStatus`:
   ```
   classOf[FileStatusWithMetadata].getDeclaredFields.foreach(println)
   
   private final 
org.apache.spark.sql.execution.datasources.SerializableFileStatus 
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.fileStatus
   private final scala.collection.immutable.Map 
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.metadata
   ```
   ```
   classOf[FileStatus].isAssignableFrom(classOf[SerializableFileStatus])
   res12: Boolean = false
   ```
   Databricks Spark provides this API to get the `FileStatus` object from the 
`FileStatusWithMetadata`
   ```
   classOf[FileStatusWithMetadata].getDeclaredMethods
   ...
   public org.apache.hadoop.fs.FileStatus 
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.toFileStatus()
   ```
   
   2. `PartitionedFile(partitionValues, SparkPath.fromUri(filePath.toUri), 
start, length)` does not work in Databricks Runtime
   In OSS Spark:
   ```
   case class PartitionedFile(
       partitionValues: InternalRow,
       filePath: SparkPath,
       start: Long,
       length: Long,
       @transient locations: Array[String] = Array.empty,
       modificationTime: Long = 0L,
       fileSize: Long = 0L,
       otherConstantMetadataColumnValues: Map[String, Any] = Map.empty)
   ```
   In Databricks Spark, the type of `locations` has changed from `Array` to 
`Seq`:
   ```
   classOf[PartitionedFile].getDeclaredFields.foreach(println)
   
   private final org.apache.spark.sql.catalyst.InternalRow 
org.apache.spark.sql.execution.datasources.PartitionedFile.partitionValues
   private final org.apache.spark.paths.SparkPath 
org.apache.spark.sql.execution.datasources.PartitionedFile.filePath
   private final long 
org.apache.spark.sql.execution.datasources.PartitionedFile.start
   private final long 
org.apache.spark.sql.execution.datasources.PartitionedFile.length
   private final transient scala.collection.Seq 
org.apache.spark.sql.execution.datasources.PartitionedFile.locations
   private final long 
org.apache.spark.sql.execution.datasources.PartitionedFile.modificationTime
   private final long 
org.apache.spark.sql.execution.datasources.PartitionedFile.fileSize
   private final scala.collection.immutable.Map 
org.apache.spark.sql.execution.datasources.PartitionedFile.otherConstantMetadataColumnValues
   private final scala.collection.mutable.Map 
org.apache.spark.sql.execution.datasources.PartitionedFile.tags
   private scala.Option 
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePathKey
   private scala.Option 
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePath
   private scala.Option 
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$rowIndexFilter
   private boolean 
org.apache.spark.sql.execution.datasources.PartitionedFile.useSplittableFileScan
   ```
   This causes the following exception:
   ```
   Caused by: java.lang.NoSuchMethodError: 'java.lang.String[] 
org.apache.spark.sql.execution.datasources.PartitionedFile$.apply$default$5()'
        at 
org.apache.spark.sql.execution.datasources.HoodieSpark35PartitionedFileUtils$.createPartitionedFile(HoodieSpark35PartitionedFileUtils.scala:45)
        at 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:95)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:162)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:291)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableIterator(HoodieFileGroupReader.java:300)
        at 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat.$anonfun$buildReaderWithPartitionValues$4(HoodieFileGroupReaderBasedFileFormat.scala:273)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:737)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:1084)
   ```
   ### Summary and Changelog
   
   This PR adapts the Hudi Spark integration for Databricks Runtime based on 
the above findings, to fix the incremental query with full scan mode.
   
   ### Impact
   
   Fixes incremental query with full scan mode.
   
   ### Risk Level
   
   none
   
   ### Documentation Update
   
   N/A
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Enough context is provided in the sections above
   - [ ] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to