umehrot2 commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r654728218
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
metaClient.reloadActiveTimeline()
val activeInstants =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants,
allFiles)
- cachedAllInputFiles =
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
- cachedAllPartitionPaths = partitionFiles.keys.toSeq
- cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+ (tableType, queryType) match {
+ case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+ // Fetch and store latest base and log files, and their sizes
+ cachedAllInputFiles = partitionFiles.map(p => {
+ val latestSlices =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath,
activeInstants.lastInstant().get().getTimestamp)
Review comment:
Added check.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
}
@transient @volatile private var fileSystemView: HoodieTableFileSystemView =
_
- @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile]
= _
+ @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath,
Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _
@transient @volatile private var cachedFileSize: Long = 0L
- @transient @volatile private var cachedAllPartitionPaths:
Seq[PartitionRowPath] = _
@volatile private var queryAsNonePartitionedTable: Boolean = _
refresh0()
override def rootPaths: Seq[Path] = queryPath :: Nil
+ /**
+ * Invoked by Spark to fetch list of latest base files per partition.
+ *
+ * @param partitionFilters partition column filters
+ * @param dataFilters data columns filters
+ * @return list of PartitionDirectory containing partition to base files
mapping
+ */
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]):
Seq[PartitionDirectory] = {
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
Seq(PartitionDirectory(InternalRow.empty, allFiles))
} else {
// Prune the partition path by the partition filters
- val prunedPartitions = prunePartition(cachedAllPartitionPaths,
partitionFilters)
+ val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq,
partitionFilters)
prunedPartitions.map { partition =>
- val fileStatues =
fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
- .asScala.toSeq
- .map(_.getFileStatus)
- PartitionDirectory(partition.values, fileStatues)
+ val baseFileStatuses =
cachedAllInputFiles(partition).keys.map(_.getFileStatus).toSeq
+ PartitionDirectory(partition.values, baseFileStatuses)
}
}
}
+ /**
+ * Fetch list of latest base files and log files per partition.
+ *
+ * @param partitionFilters partition column filters
+ * @param dataFilters data column filters
+ * @return mapping from string partition paths to its base/log files
+ */
+ def listBaseAndLogFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]):
+ Map[String, Map[HoodieBaseFile, Seq[HoodieLogFile]]] = {
+ if (queryAsNonePartitionedTable) {
+ // Read as Non-Partitioned table.
+ cachedAllInputFiles.map(entry => (entry._1.partitionPath, entry._2))
+ } else {
+ // Prune the partition path by the partition filters
+ val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq,
partitionFilters)
+ prunedPartitions.map(partition => {
+ (partition.partitionPath, cachedAllInputFiles(partition))
+ }).toMap
+ }
+ }
+
override def inputFiles: Array[String] = {
- cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
+ cachedAllInputFiles.values.flatten.flatMap(baseLogFilesMapping => {
+ Iterator(baseLogFilesMapping._1.getPath) ++
baseLogFilesMapping._2.map(_.getFileStatus.getPath.toString)
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]