vinothchandar commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r638391032



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
   }
 
   @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
-  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath, 
Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _

Review comment:
       Could we just reuse a FileSlice object here? instead of the map of base 
to logs. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
   }
 
   @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
-  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath, 
Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _
   @transient @volatile private var cachedFileSize: Long = 0L
-  @transient @volatile private var cachedAllPartitionPaths: 
Seq[PartitionRowPath] = _
 
   @volatile private var queryAsNonePartitionedTable: Boolean = _
 
   refresh0()
 
   override def rootPaths: Seq[Path] = queryPath :: Nil
 
+  /**
+   * Invoked by Spark to fetch list of latest base files per partition.
+   *
+   * @param partitionFilters partition column filters
+   * @param dataFilters data columns filters
+   * @return list of PartitionDirectory containing partition to base files 
mapping
+   */
   override def listFiles(partitionFilters: Seq[Expression],
                          dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
     if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
       Seq(PartitionDirectory(InternalRow.empty, allFiles))
     } else {
       // Prune the partition path by the partition filters
-      val prunedPartitions = prunePartition(cachedAllPartitionPaths, 
partitionFilters)
+      val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq, 
partitionFilters)
       prunedPartitions.map { partition =>
-        val fileStatues = 
fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()

Review comment:
       so previously, we were actually performing the listing for each 
`listFiles()` call? Without actually using the cached values?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
     metaClient.reloadActiveTimeline()
     val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
     fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, 
allFiles)
-    cachedAllInputFiles = 
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
-    cachedAllPartitionPaths = partitionFiles.keys.toSeq
-    cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+    (tableType, queryType) match {
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+        // Fetch and store latest base and log files, and their sizes
+        cachedAllInputFiles = partitionFiles.map(p => {
+          val latestSlices = 
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, 
activeInstants.lastInstant().get().getTimestamp)
+          val baseAndLogFilesMapping = 
latestSlices.iterator().asScala.map(slice => {
+            (slice.getBaseFile.get(), 
slice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toSeq)
+          }).toMap
+          (p._1, baseAndLogFilesMapping)
+        })
+        cachedFileSize = 
cachedAllInputFiles.values.flatten.map(baseLogFilesMap => {
+          baseLogFilesMap._1.getFileLen + 
baseLogFilesMap._2.map(_.getFileSize).sum
+        }).sum
+      case (_, _) =>

Review comment:
       wonder how all this works with incremental queries?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
     metaClient.reloadActiveTimeline()
     val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
     fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, 
allFiles)
-    cachedAllInputFiles = 
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
-    cachedAllPartitionPaths = partitionFiles.keys.toSeq
-    cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+    (tableType, queryType) match {
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+        // Fetch and store latest base and log files, and their sizes
+        cachedAllInputFiles = partitionFiles.map(p => {
+          val latestSlices = 
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, 
activeInstants.lastInstant().get().getTimestamp)
+          val baseAndLogFilesMapping = 
latestSlices.iterator().asScala.map(slice => {
+            (slice.getBaseFile.get(), 
slice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toSeq)

Review comment:
       Worth double checking that the comparator is actually sorting in the 
desired order. Just a random word of caution

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -314,18 +379,15 @@ case class HoodieFileIndex(
         case None => pathToFetch.append(partitionRowPath)
       }
     }
-    // Fetch the rest from the file system.
-    val fetchedPartition2Files =
-      spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, 
maxListParallelism))
-        .map { partitionRowPath =>
-          // Here we use a LocalEngineContext to get the files in the 
partition.
-          // We can do this because the TableMetadata.getAllFilesInPartition 
only rely on the
-          // hadoopConf of the EngineContext.
-          val engineContext = new 
HoodieLocalEngineContext(serializableConf.get())
-          val filesInPartition =  FSUtils.getFilesInPartition(engineContext, 
metadataConfig,
-              basePath, partitionRowPath.fullPartitionPath(basePath))
-          (partitionRowPath, filesInPartition)
-        }.collect().map(f => f._1 -> f._2).toMap
+
+    var fetchedPartition2Files: Map[PartitionRowPath, Array[FileStatus]] = 
Map()

Review comment:
       rename: fetchedPartitionToFiles ?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
   }
 
   @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
-  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath, 
Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _
   @transient @volatile private var cachedFileSize: Long = 0L
-  @transient @volatile private var cachedAllPartitionPaths: 
Seq[PartitionRowPath] = _
 
   @volatile private var queryAsNonePartitionedTable: Boolean = _
 
   refresh0()
 
   override def rootPaths: Seq[Path] = queryPath :: Nil
 
+  /**
+   * Invoked by Spark to fetch list of latest base files per partition.
+   *
+   * @param partitionFilters partition column filters
+   * @param dataFilters data columns filters
+   * @return list of PartitionDirectory containing partition to base files 
mapping
+   */
   override def listFiles(partitionFilters: Seq[Expression],
                          dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
     if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
       Seq(PartitionDirectory(InternalRow.empty, allFiles))
     } else {
       // Prune the partition path by the partition filters
-      val prunedPartitions = prunePartition(cachedAllPartitionPaths, 
partitionFilters)
+      val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq, 
partitionFilters)
       prunedPartitions.map { partition =>
-        val fileStatues = 
fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
-          .asScala.toSeq
-          .map(_.getFileStatus)
-        PartitionDirectory(partition.values, fileStatues)
+        val baseFileStatuses = 
cachedAllInputFiles(partition).keys.map(_.getFileStatus).toSeq
+        PartitionDirectory(partition.values, baseFileStatuses)
       }
     }
   }
 
+  /**
+   * Fetch list of latest base files and log files per partition.
+   *
+   * @param partitionFilters partition column filters
+   * @param dataFilters data column filters
+   * @return mapping from string partition paths to its base/log files
+   */
+  def listBaseAndLogFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]):
+      Map[String, Map[HoodieBaseFile, Seq[HoodieLogFile]]] = {
+    if (queryAsNonePartitionedTable) {
+      // Read as Non-Partitioned table.
+      cachedAllInputFiles.map(entry => (entry._1.partitionPath, entry._2))
+    } else {
+      // Prune the partition path by the partition filters
+      val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq, 
partitionFilters)
+      prunedPartitions.map(partition => {
+        (partition.partitionPath, cachedAllInputFiles(partition))
+      }).toMap
+    }
+  }
+
   override def inputFiles: Array[String] = {
-    cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
+    cachedAllInputFiles.values.flatten.flatMap(baseLogFilesMapping => {
+      Iterator(baseLogFilesMapping._1.getPath) ++ 
baseLogFilesMapping._2.map(_.getFileStatus.getPath.toString)

Review comment:
       any chance we can reuse code across this method and allFiles below?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
     metaClient.reloadActiveTimeline()
     val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
     fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, 
allFiles)
-    cachedAllInputFiles = 
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
-    cachedAllPartitionPaths = partitionFiles.keys.toSeq
-    cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+    (tableType, queryType) match {
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+        // Fetch and store latest base and log files, and their sizes
+        cachedAllInputFiles = partitionFiles.map(p => {
+          val latestSlices = 
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, 
activeInstants.lastInstant().get().getTimestamp)

Review comment:
       any error handling needed for the case where the timeline is empty?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to