yihua opened a new pull request, #18003:
URL: https://github.com/apache/hudi/pull/18003
### Describe the issue this Pull Request addresses
Fixes #18002
Hudi Spark integration uses certain Spark APIs through Hudi's `SparkAdapter`
framework. Databricks Spark Runtime has incompatible APIs compared OSS Spark.
Particularly, the following Databricks Spark classes cause Hudi's incremental
query needing full scan (a particular case) to fail (which invokes
`HoodieBaseRelation#listLatestFileSlices`).
1. `FileStatusWithMetadata#fileStatus` does not work in Databricks Runtime
In OSS Spark:
```
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata:
Map[String, Any] = Map.empty) {
// Wrapper methods to improve source compatibility in code that still
expects a [[FileStatus]].
def getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
}
```
In Databricks Spark, the type of `fileStatus` is changed from `FileStatus`
to `SerializableFileStatus`:
```
classOf[FileStatusWithMetadata].getDeclaredFields.foreach(println)
private final
org.apache.spark.sql.execution.datasources.SerializableFileStatus
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.fileStatus
private final scala.collection.immutable.Map
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.metadata
```
```
classOf[FileStatus].isAssignableFrom(classOf[SerializableFileStatus])
res12: Boolean = false
```
Databricks Spark provides this API to get the `FileStatus` object from the
`FileStatusWithMetadata`
```
classOf[FileStatusWithMetadata].getDeclaredMethods
...
public org.apache.hadoop.fs.FileStatus
org.apache.spark.sql.execution.datasources.FileStatusWithMetadata.toFileStatus()
```
2. `PartitionedFile(partitionValues, SparkPath.fromUri(filePath.toUri),
start, length)` does not work in Databricks Runtime
In OSS Spark:
```
case class PartitionedFile(
partitionValues: InternalRow,
filePath: SparkPath,
start: Long,
length: Long,
@transient locations: Array[String] = Array.empty,
modificationTime: Long = 0L,
fileSize: Long = 0L,
otherConstantMetadataColumnValues: Map[String, Any] = Map.empty)
```
In Databricks Spark, the type of `locations` has changed from `Array` to
`Seq`:
```
classOf[PartitionedFile].getDeclaredFields.foreach(println)
private final org.apache.spark.sql.catalyst.InternalRow
org.apache.spark.sql.execution.datasources.PartitionedFile.partitionValues
private final org.apache.spark.paths.SparkPath
org.apache.spark.sql.execution.datasources.PartitionedFile.filePath
private final long
org.apache.spark.sql.execution.datasources.PartitionedFile.start
private final long
org.apache.spark.sql.execution.datasources.PartitionedFile.length
private final transient scala.collection.Seq
org.apache.spark.sql.execution.datasources.PartitionedFile.locations
private final long
org.apache.spark.sql.execution.datasources.PartitionedFile.modificationTime
private final long
org.apache.spark.sql.execution.datasources.PartitionedFile.fileSize
private final scala.collection.immutable.Map
org.apache.spark.sql.execution.datasources.PartitionedFile.otherConstantMetadataColumnValues
private final scala.collection.mutable.Map
org.apache.spark.sql.execution.datasources.PartitionedFile.tags
private scala.Option
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePathKey
private scala.Option
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$basePath
private scala.Option
org.apache.spark.sql.execution.datasources.PartitionedFile.org$apache$spark$sql$execution$datasources$PartitionedFile$$rowIndexFilter
private boolean
org.apache.spark.sql.execution.datasources.PartitionedFile.useSplittableFileScan
```
This causes the following exception:
```
Caused by: java.lang.NoSuchMethodError: 'java.lang.String[]
org.apache.spark.sql.execution.datasources.PartitionedFile$.apply$default$5()'
at
org.apache.spark.sql.execution.datasources.HoodieSpark35PartitionedFileUtils$.createPartitionedFile(HoodieSpark35PartitionedFileUtils.scala:45)
at
org.apache.hudi.SparkFileFormatInternalRowReaderContext.getFileRecordIterator(SparkFileFormatInternalRowReaderContext.scala:95)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.makeBaseFileIterator(HoodieFileGroupReader.java:162)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.initRecordIterators(HoodieFileGroupReader.java:129)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.getBufferedRecordIterator(HoodieFileGroupReader.java:291)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.getClosableIterator(HoodieFileGroupReader.java:300)
at
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat.$anonfun$buildReaderWithPartitionValues$4(HoodieFileGroupReaderBasedFileFormat.scala:273)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:737)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:1084)
```
### Summary and Changelog
This PR adapts the Hudi Spark integration for Databricks Runtime based on
the above findings, to fix the incremental query with full scan mode.
### Impact
Fixes incremental query with full scan mode.
### Risk Level
none
### Documentation Update
N/A
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Enough context is provided in the sections above
- [ ] Adequate tests were added if applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]