yihua commented on code in PR #12056:
URL: https://github.com/apache/hudi/pull/12056#discussion_r1796208714
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -319,7 +320,7 @@ case class HoodieFileIndex(spark: SparkSession,
val prunedPartitionsTuple: (Boolean, Seq[PartitionPath]) =
if (isPartitionedTable && partitionFilters.nonEmpty) {
// For partitioned table and partition filters, prune the partitions
by the partition filters
- if (shouldEmbedFileSlices) {
+ if (shouldConvertFilter) {
Review Comment:
What is the rationale behind this change?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -183,9 +183,9 @@ case class HoodieFileIndex(spark: SparkSession,
}).filter(slice => slice != null)
.map(fileInfo => new FileStatus(fileInfo.getLength,
fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
- val c = fileSlices.filter(f => (includeLogFiles &&
f.getLogFiles.findAny().isPresent)
- || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
- foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ val c = fileSlices.filter(f => f.hasBootstrapBase ||
(includeLogFiles && f.hasLogFiles))
Review Comment:
The docs is outdated. Let's fix that in a follow-up.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -183,9 +184,9 @@ case class HoodieFileIndex(spark: SparkSession,
}).filter(slice => slice != null)
.map(fileInfo => new FileStatus(fileInfo.getLength,
fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
- val c = fileSlices.filter(f => (includeLogFiles &&
f.getLogFiles.findAny().isPresent)
- || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
- foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ val c = fileSlices.filter(f => f.hasBootstrapBase ||
(includeLogFiles && f.hasLogFiles))
Review Comment:
Also, the caller instantiating the file index needs to make sure the correct
argument `includeLogFiles` is passed in.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -183,9 +184,9 @@ case class HoodieFileIndex(spark: SparkSession,
}).filter(slice => slice != null)
.map(fileInfo => new FileStatus(fileInfo.getLength,
fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
- val c = fileSlices.filter(f => (includeLogFiles &&
f.getLogFiles.findAny().isPresent)
- || (f.getBaseFile.isPresent &&
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
- foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId ->
f) }
+ val c = fileSlices.filter(f => f.hasBootstrapBase ||
(includeLogFiles && f.hasLogFiles))
Review Comment:
After rethinking the changes, I think we should change the implementation of
`filterFileSlices` instead of changing the returned file slices afterwards.
The logic here should include transformations for adapting to Spark
implementation.
The reason is that, the data skipping happens in `filterFileSlices` and
prunes the file slices returned by `prunePartitionsAndGetFileSlices`, which can
contain the log files. For read-optimized queries, the data skipping should
depend on the file slices without log files; otherwise, the pruning can be
potentially wrong (data skipping considers log files, while the file listing
here does not).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]