alexeykudinkin commented on a change in pull request #4948:
URL: https://github.com/apache/hudi/pull/4948#discussion_r825138951
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
* @param queryFilters list of original data filters passed down from
querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
- private def lookupCandidateFilesInColStatsIndex(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
- val indexPath = metaClient.getColumnStatsIndexPath
+ private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
val fs = metaClient.getFs
+ val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath)
Review comment:
Discussed offline: Bespoke implementation of Col Stats Index would be
removed in 0.11
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -194,77 +192,102 @@ case class HoodieFileIndex(spark: SparkSession,
* @param queryFilters list of original data filters passed down from
querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
- private def lookupCandidateFilesInColStatsIndex(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
- val indexPath = metaClient.getColumnStatsIndexPath
+ private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
val fs = metaClient.getFs
+ val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath)
- if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) ||
queryFilters.isEmpty) {
- // scalastyle:off return
- return Success(Option.empty)
- // scalastyle:on return
- }
-
- val completedCommits =
getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp)
-
- // Collect all index tables present in `.zindex` folder
- val candidateIndexTables =
- fs.listStatus(new Path(indexPath))
- .filter(_.isDirectory)
- .map(_.getPath.getName)
- .filter(completedCommits.contains(_))
- .sortBy(x => x)
-
- if (candidateIndexTables.isEmpty) {
- // scalastyle:off return
- return Success(Option.empty)
- // scalastyle:on return
- }
-
- val dataFrameOpt = try {
- Some(spark.read.load(new Path(indexPath,
candidateIndexTables.last).toString))
- } catch {
- case t: Throwable =>
- logError("Failed to read col-stats index; skipping", t)
- None
+ if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) ||
queryFilters.isEmpty) {
Review comment:
@codope on a second thought -- there still could be case, when MT is
enabled, but it's not bootstrapped yet, so we can't equate MT being enabled in
config, with its presence in FS. Frankly, i don't see a way w/o `fs.exists` in
some shape or form -- if not here it would happen w/in Spark's Data Source.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]