pengzhiwei2018 commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r640377103
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
metaClient.reloadActiveTimeline()
val activeInstants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants,
allFiles)
- cachedAllInputFiles =
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
- cachedAllPartitionPaths = partitionFiles.keys.toSeq
- cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+ (tableType, queryType) match {
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+ // Fetch and store latest base and log files, and their sizes
+ cachedAllInputFiles = partitionFiles.map(p => {
+ val latestSlices =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath,
activeInstants.lastInstant().get().getTimestamp)
Review comment:
If there is no commit success yet, `activeInstants.lastInstant().get()`
may lead to query crash. So we'd better to return empty file list.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
}
@transient @volatile private var fileSystemView: HoodieTableFileSystemView =
_
- @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile]
= _
+ @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath,
Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _
@transient @volatile private var cachedFileSize: Long = 0L
- @transient @volatile private var cachedAllPartitionPaths:
Seq[PartitionRowPath] = _
@volatile private var queryAsNonePartitionedTable: Boolean = _
refresh0()
override def rootPaths: Seq[Path] = queryPath :: Nil
+ /**
+ * Invoked by Spark to fetch list of latest base files per partition.
+ *
+ * @param partitionFilters partition column filters
+ * @param dataFilters data columns filters
+ * @return list of PartitionDirectory containing partition to base files
mapping
+ */
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]):
Seq[PartitionDirectory] = {
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
Seq(PartitionDirectory(InternalRow.empty, allFiles))
} else {
// Prune the partition path by the partition filters
- val prunedPartitions = prunePartition(cachedAllPartitionPaths,
partitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq,
partitionFilters)
prunedPartitions.map { partition =>
- val fileStatues =
fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
Review comment:
No! We will cache the `files` in the `fileSystemView`. So each call of
`listFiles` will reuse the cache values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]