codope commented on code in PR #9345:
URL: https://github.com/apache/hudi/pull/9345#discussion_r1284229097
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -215,8 +217,8 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext:
SQLContext,
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters)
if (globPaths.isEmpty) {
- val fileSlices = fileIndex.listFileSlices(convertedPartitionFilters)
- buildSplits(fileSlices.values.flatten.toSeq)
+ val fileSlices = fileIndex.filterFileSlices(dataFilters,
fileIndex.getFileSlicesForPrunedPartitions(convertedPartitionFilters)).flatMap(s
=> s._2)
+ buildSplits(fileSlices)
} else {
val fileSlices = listLatestFileSlices(globPaths, partitionFilters,
dataFilters)
Review Comment:
Have you verified data filters work for globbed paths?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
+ case (partition, fileSlices) =>
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+
+ PartitionDirectory(InternalRow.fromSeq(partition.get.values),
allCandidateFiles)
+ }
+
+ hasPushedDownPartitionPredicates = true
+
+ if (shouldReadAsPartitionedTable()) {
+ listedPartitions
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
listedPartitions.flatMap(_.files)))
+ }
+ }
+
+ def filterFileSlices(dataFilters: Seq[Expression], partitionAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])])
+ : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
// Look up candidate files names in the col-stats index, if all of the
following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters) match {
- case Success(opt) => opt
- case Failure(e) =>
- logError("Failed to lookup candidate files in File Index", e)
-
- spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
- case DataSkippingFailureMode.Fallback.value => Option.empty
- case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
- }
- }
+ lookupCandidateFilesInMetadataTable(dataFilters) match {
Review Comment:
Can we avoid looking up candidate files and return early when
`partitionAndFileSlices` is empty, let's say when there are no files matching
glob pattern?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
+ case (partition, fileSlices) =>
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+
+ PartitionDirectory(InternalRow.fromSeq(partition.get.values),
allCandidateFiles)
+ }
+
+ hasPushedDownPartitionPredicates = true
+
+ if (shouldReadAsPartitionedTable()) {
+ listedPartitions
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
listedPartitions.flatMap(_.files)))
+ }
+ }
+
+ def filterFileSlices(dataFilters: Seq[Expression], partitionAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])])
+ : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
// Look up candidate files names in the col-stats index, if all of the
following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters) match {
- case Success(opt) => opt
- case Failure(e) =>
- logError("Failed to lookup candidate files in File Index", e)
-
- spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
- case DataSkippingFailureMode.Fallback.value => Option.empty
- case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
- }
- }
+ lookupCandidateFilesInMetadataTable(dataFilters) match {
+ case Success(opt) => opt
+ case Failure(e) =>
+ logError("Failed to lookup candidate files in File Index", e)
+
+ spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
+ case DataSkippingFailureMode.Fallback.value => Option.empty
+ case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
+ }
+ }
logDebug(s"Overlapping candidate files from Column Stats Index:
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
- var totalFileSize = 0
- var candidateFileSize = 0
-
- // Prune the partition path by the partition filters
- // NOTE: Non-partitioned tables are assumed to consist from a single
partition
- // encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
- val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
- case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
- .asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null))
+ var totalFileSliceSize = 0
+ var candidateFileSliceSize = 0
+ val listedPartitions = partitionAndFileSlices.map {
+ case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats index lookup
- val candidateFiles = baseFileStatuses.filter(fs =>
- // NOTE: This predicate is true when {@code Option} is empty
- candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
+ val candidateFileSlices: Seq[FileSlice] = {
+ fileSlices.filter(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFiles =
fs.getLogFiles.collect(Collectors.toSet[HoodieLogFile]).asScala.toSet[HoodieLogFile]
+ // NOTE: This predicate is true when {@code Option} is empty
+ if (candidateFilesNamesOpt.forall(files =>
baseFileStatusOpt.exists(f => files.contains(f.getPath.getName))
+ || files.intersect(logFiles.map(f =>
f.getPath.getName)).nonEmpty)) {
+ true
+ } else {
+ false
+ }
+ })
+ }
- totalFileSize += baseFileStatuses.size
- candidateFileSize += candidateFiles.size
- PartitionDirectory(InternalRow.fromSeq(partition.values),
candidateFiles)
+ totalFileSliceSize += fileSlices.size
+ candidateFileSliceSize += candidateFileSlices.size
+ (partitionOpt, candidateFileSlices)
}
val skippingRatio =
if (!areAllFileSlicesCached) -1
- else if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize -
candidateFileSize) / totalFileSize.toDouble
+ else if (allFiles.nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
Review Comment:
Is this calculation right? Isn't `allFiles` just a sequence of file satuses
of base files.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
Review Comment:
let's renames to `filteredPartitionsAndFileSlices` as it applies data
filters.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
+ case (partition, fileSlices) =>
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+
+ PartitionDirectory(InternalRow.fromSeq(partition.get.values),
allCandidateFiles)
+ }
+
+ hasPushedDownPartitionPredicates = true
+
+ if (shouldReadAsPartitionedTable()) {
+ listedPartitions
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
listedPartitions.flatMap(_.files)))
+ }
+ }
+
+ def filterFileSlices(dataFilters: Seq[Expression], partitionAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])])
+ : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
// Look up candidate files names in the col-stats index, if all of the
following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters) match {
- case Success(opt) => opt
- case Failure(e) =>
- logError("Failed to lookup candidate files in File Index", e)
-
- spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
- case DataSkippingFailureMode.Fallback.value => Option.empty
- case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
- }
- }
+ lookupCandidateFilesInMetadataTable(dataFilters) match {
+ case Success(opt) => opt
+ case Failure(e) =>
+ logError("Failed to lookup candidate files in File Index", e)
+
+ spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
+ case DataSkippingFailureMode.Fallback.value => Option.empty
+ case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
+ }
+ }
logDebug(s"Overlapping candidate files from Column Stats Index:
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
- var totalFileSize = 0
- var candidateFileSize = 0
-
- // Prune the partition path by the partition filters
- // NOTE: Non-partitioned tables are assumed to consist from a single
partition
- // encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
- val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
- case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
- .asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null))
+ var totalFileSliceSize = 0
+ var candidateFileSliceSize = 0
+ val listedPartitions = partitionAndFileSlices.map {
+ case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats index lookup
- val candidateFiles = baseFileStatuses.filter(fs =>
- // NOTE: This predicate is true when {@code Option} is empty
- candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
+ val candidateFileSlices: Seq[FileSlice] = {
+ fileSlices.filter(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFiles =
fs.getLogFiles.collect(Collectors.toSet[HoodieLogFile]).asScala.toSet[HoodieLogFile]
+ // NOTE: This predicate is true when {@code Option} is empty
+ if (candidateFilesNamesOpt.forall(files =>
baseFileStatusOpt.exists(f => files.contains(f.getPath.getName))
+ || files.intersect(logFiles.map(f =>
f.getPath.getName)).nonEmpty)) {
+ true
+ } else {
+ false
+ }
+ })
+ }
- totalFileSize += baseFileStatuses.size
- candidateFileSize += candidateFiles.size
- PartitionDirectory(InternalRow.fromSeq(partition.values),
candidateFiles)
+ totalFileSliceSize += fileSlices.size
+ candidateFileSliceSize += candidateFileSlices.size
+ (partitionOpt, candidateFileSlices)
}
val skippingRatio =
if (!areAllFileSlicesCached) -1
- else if (allFiles.nonEmpty && totalFileSize > 0) (totalFileSize -
candidateFileSize) / totalFileSize.toDouble
+ else if (allFiles.nonEmpty && totalFileSliceSize > 0)
(totalFileSliceSize - candidateFileSliceSize) / totalFileSliceSize.toDouble
Review Comment:
Related to that, should we also have something like `allFiles` - seq of
files status of all files for all file slices, and `allBaseFiles` - seq of file
status of only base files. Or you can keep `allFiles` only but it would mean
seq of base files if `includeLogFiles` is false.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
+ case (partition, fileSlices) =>
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+
+ PartitionDirectory(InternalRow.fromSeq(partition.get.values),
allCandidateFiles)
Review Comment:
let's ensure `partition` is non empty. what happens for a non-partitioned
table?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
Review Comment:
Let's add more test for the file index and e2e tests including compaction
and deletes as discussed.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -125,6 +125,90 @@ class TestColumnStatsIndex extends
HoodieSparkClientTestBase {
saveMode = SaveMode.Append)
}
+ @ParameterizedTest
+ @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+ def testMetadataColumnStatsIndexWithSQL(testCase: ColumnStatsTestCase): Unit
= {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ var commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+ ) ++ metadataOpts
+
+ doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/input-table-json",
+ expectedColStatsSourcePath =
"index/colstats/column-stats-index-table.json",
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+
+ doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/another-input-table-json",
+ expectedColStatsSourcePath =
"index/colstats/updated-column-stats-index-table.json",
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+ // NOTE: MOR and COW have different fixtures since MOR is bearing
delta-log files (holding
+ // deferred updates), diverging from COW
+ val expectedColStatsSourcePath = if (testCase.tableType ==
HoodieTableType.COPY_ON_WRITE) {
+ "index/colstats/cow-updated2-column-stats-index-table.json"
+ } else {
+ "index/colstats/mor-updated2-column-stats-index-table.json"
+ }
+
+ createSQLTable(commonOpts,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ val c5GT50WithDataSkipping = spark.sql("select * from tbl where c5 >
70").count()
+
+ doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+ dataSourcePath = "index/colstats/update-input-table-json",
+ expectedColStatsSourcePath = expectedColStatsSourcePath,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+ // verify snapshot query
+ verifySQLQueries(c5GT50WithDataSkipping,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, commonOpts)
+
+ // verify read_optimized query
+ verifySQLQueries(c5GT50WithDataSkipping,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, commonOpts)
+
+ // verify incremental query
+ verifySQLQueries(c5GT50WithDataSkipping,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts)
+ commonOpts = commonOpts +
(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key
-> "true")
+ verifySQLQueries(c5GT50WithDataSkipping,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts)
+ }
+
+ private def verifySQLQueries(c5GT50WithDataSkippingAtPrevInstant: Long,
queryType: String, opts: Map[String, String]): Unit = {
+ // 2 records are updated with c5 greater than 70 and one record is
inserted with c5 value greater than 70
+ var commonOpts:Map[String, String] = opts
+ createSQLTable(commonOpts, queryType)
+ val increment = if
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) &&
metaClient.getTableType == HoodieTableType.MERGE_ON_READ) {
+ 1 // only one insert
+ } else {
+ 3 // one insert and two upserts
+ }
+ assertEquals(spark.sql("select * from tbl where c5 > 70").count(),
c5GT50WithDataSkippingAtPrevInstant + increment)
+ val c5GT50WithDataSkipping = spark.sql("select * from tbl where c5 >
70").count()
+
+ if
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
+ createIncrementalSQLTable(commonOpts,
metaClient.reloadActiveTimeline().getInstants.get(1).getTimestamp)
+ assertEquals(spark.sql("select * from tbl where c5 > 70").count(), 3)
+ }
+
+ commonOpts = opts + (DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"false")
+ createSQLTable(commonOpts, queryType)
+ val c5Gt50WithoutDataSkipping = spark.sql("select * from tbl where c5 >
70").count()
Review Comment:
please rename it better
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -122,81 +125,120 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ // Prune the partition path by the partition filters
+ // NOTE: Non-partitioned tables are assumed to consist from a single
partition
+ // encompassing the whole table
+ val partitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val listedPartitions = filterFileSlices(dataFilters,
partitionsAndFileSlices).map {
+ case (partition, fileSlices) =>
+ val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFilesStatus = if (includeLogFiles) {
+
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile,
FileStatus](lf => lf.getFileStatus))
+ } else {
+ java.util.stream.Stream.empty()
+ }
+ val files =
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+ baseFileStatusOpt.foreach(f => files.append(f))
+ files
+ })
+
+ PartitionDirectory(InternalRow.fromSeq(partition.get.values),
allCandidateFiles)
+ }
+
+ hasPushedDownPartitionPredicates = true
+
+ if (shouldReadAsPartitionedTable()) {
+ listedPartitions
+ } else {
+ Seq(PartitionDirectory(InternalRow.empty,
listedPartitions.flatMap(_.files)))
+ }
+ }
+
+ def filterFileSlices(dataFilters: Seq[Expression], partitionAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])])
+ : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
// Look up candidate files names in the col-stats index, if all of the
following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters) match {
- case Success(opt) => opt
- case Failure(e) =>
- logError("Failed to lookup candidate files in File Index", e)
-
- spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
- case DataSkippingFailureMode.Fallback.value => Option.empty
- case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
- }
- }
+ lookupCandidateFilesInMetadataTable(dataFilters) match {
+ case Success(opt) => opt
+ case Failure(e) =>
+ logError("Failed to lookup candidate files in File Index", e)
+
+ spark.sqlContext.getConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Fallback.value) match {
+ case DataSkippingFailureMode.Fallback.value => Option.empty
+ case DataSkippingFailureMode.Strict.value => throw new
HoodieException(e);
+ }
+ }
logDebug(s"Overlapping candidate files from Column Stats Index:
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
- var totalFileSize = 0
- var candidateFileSize = 0
-
- // Prune the partition path by the partition filters
- // NOTE: Non-partitioned tables are assumed to consist from a single
partition
- // encompassing the whole table
- val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
- val listedPartitions = getInputFileSlices(prunedPartitions:
_*).asScala.toSeq.map {
- case (partition, fileSlices) =>
- val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
- .asScala
- .map(fs => fs.getBaseFile.orElse(null))
- .filter(_ != null))
+ var totalFileSliceSize = 0
+ var candidateFileSliceSize = 0
+ val listedPartitions = partitionAndFileSlices.map {
+ case (partitionOpt, fileSlices) =>
// Filter in candidate files based on the col-stats index lookup
- val candidateFiles = baseFileStatuses.filter(fs =>
- // NOTE: This predicate is true when {@code Option} is empty
- candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
+ val candidateFileSlices: Seq[FileSlice] = {
+ fileSlices.filter(fs => {
+ val baseFileStatusOpt =
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+ val logFiles =
fs.getLogFiles.collect(Collectors.toSet[HoodieLogFile]).asScala.toSet[HoodieLogFile]
+ // NOTE: This predicate is true when {@code Option} is empty
+ if (candidateFilesNamesOpt.forall(files =>
baseFileStatusOpt.exists(f => files.contains(f.getPath.getName))
+ || files.intersect(logFiles.map(f =>
f.getPath.getName)).nonEmpty)) {
+ true
+ } else {
+ false
+ }
Review Comment:
let's make sure that this block is covered in a unit test (essentially a
file slice with only log files).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]