This is an automated email from the ASF dual-hosted git repository.
stream2000 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eae5d4ae8e6 [HUDI-7291] Pushing Down Partition Pruning Conditions to
Column Stats Earlier During Data Skipping (#10493)
eae5d4ae8e6 is described below
commit eae5d4ae8e62014191fac76bbbeae0939f11100b
Author: majian <[email protected]>
AuthorDate: Wed Jan 17 14:17:29 2024 +0800
[HUDI-7291] Pushing Down Partition Pruning Conditions to Column Stats
Earlier During Data Skipping (#10493)
* push down partition pruning filters when loading col stats index
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 14 ++++++--
.../scala/org/apache/hudi/HoodieFileIndex.scala | 37 ++++++++++++++--------
2 files changed, 36 insertions(+), 15 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 9cdb15092b0..7a75c6c35ca 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
+import org.apache.hudi.common.function.SerializableFunction
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.BinaryUtil.toBytes
@@ -106,14 +107,23 @@ class ColumnStatsIndexSupport(spark: SparkSession,
*
* Please check out scala-doc of the [[transpose]] method explaining this
view in more details
*/
- def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory:
Boolean)(block: DataFrame => T): T = {
+ def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory:
Boolean, prunedFileNames: Set[String] = Set.empty)(block: DataFrame => T): T = {
cachedColumnStatsIndexViews.get(targetColumns) match {
case Some(cachedDF) =>
block(cachedDF)
case None =>
- val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = if
(prunedFileNames.isEmpty) {
+ // NOTE: Because some tests directly check this method and don't get
prunedPartitionsAndFileSlices, we need to make sure these tests are correct.
loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ } else {
+ val filterFunction = new
SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
+ override def apply(r: HoodieMetadataColumnStats):
java.lang.Boolean = {
+ prunedFileNames.contains(r.getFileName)
+ }
+ }
+ loadColumnStatsIndexRecords(targetColumns,
shouldReadInMemory).filter(filterFunction)
+ }
withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
val (transposedRows, indexSchema) = transpose(colStatsRecords,
targetColumns)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 709dfec183b..db8525be3d1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -234,7 +234,7 @@ case class HoodieFileIndex(spark: SparkSession,
// - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters) match {
+ lookupCandidateFilesInMetadataTable(dataFilters,
prunedPartitionsAndFileSlices) match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)
@@ -316,11 +316,6 @@ case class HoodieFileIndex(spark: SparkSession,
})
}
- private def lookupFileNamesMissingFromIndex(allIndexedFileNames:
Set[String]) = {
- val allFileNames = getAllFiles().map(f => f.getPath.getName).toSet
- allFileNames -- allIndexedFileNames
- }
-
/**
* Computes pruned list of candidate base-files' names based on provided
list of {@link dataFilters}
* conditions, by leveraging Metadata Table's Record Level Index and Column
Statistics index (hereon referred as
@@ -333,7 +328,7 @@ case class HoodieFileIndex(spark: SparkSession,
* @param queryFilters list of original data filters passed down from
querying engine
* @return list of pruned (data-skipped) candidate base-files and log files'
names
*/
- private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
+ private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression], prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]):
Try[Option[Set[String]]] = Try {
// NOTE: For column stats, Data Skipping is only effective when it
references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters
like "struct.field > 0", since
@@ -345,7 +340,6 @@ case class HoodieFileIndex(spark: SparkSession,
// and candidate files are obtained from these file slices.
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
-
lazy val (_, recordKeys) =
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
validateConfig()
@@ -353,9 +347,10 @@ case class HoodieFileIndex(spark: SparkSession,
} else if (recordKeys.nonEmpty) {
Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(),
recordKeys))
} else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
+ val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
val shouldReadInMemory = functionalIndex.shouldReadInMemory(this,
queryReferencedColumns)
val indexDf = functionalIndex.loadFunctionalIndexDataFrame("",
shouldReadInMemory)
- Some(getCandidateFiles(indexDf, queryFilters))
+ Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
} else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty ||
queryReferencedColumns.isEmpty) {
validateConfig()
Option.empty
@@ -366,14 +361,30 @@ case class HoodieFileIndex(spark: SparkSession,
// For that we use a simple-heuristic to determine whether we
should read and process CSI in-memory or
// on-cluster: total number of rows of the expected projected
portion of the index has to be below the
// threshold (of 100k records)
+ val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this,
queryReferencedColumns)
- columnStatsIndex.loadTransposed(queryReferencedColumns,
shouldReadInMemory) { transposedColStatsDF =>
- Some(getCandidateFiles(transposedColStatsDF, queryFilters))
+ columnStatsIndex.loadTransposed(queryReferencedColumns,
shouldReadInMemory, prunedFileNames) { transposedColStatsDF =>
+ Some(getCandidateFiles(transposedColStatsDF, queryFilters,
prunedFileNames))
}
}
}
- private def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression]): Set[String] = {
+ private def getPrunedFileNames(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]):
Set[String] = {
+ prunedPartitionsAndFileSlices
+ .flatMap {
+ case (_, fileSlices) => fileSlices
+ }
+ .flatMap { fileSlice =>
+ val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+ val logFiles = if (includeLogFiles) {
+ fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
+ } else Nil
+ baseFileOption.map(_.getFileName).toList ++ logFiles
+ }
+ .toSet
+ }
+
+ private def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
val indexSchema = indexDf.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
val prunedCandidateFileNames =
@@ -395,7 +406,7 @@ case class HoodieFileIndex(spark: SparkSession,
.collect()
.map(_.getString(0))
.toSet
- val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
+ val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
prunedCandidateFileNames ++ notIndexedFileNames
}