This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new cb6eb6785fd [HUDI-7653] Refactor HoodieFileIndex for more flexibility (#11074) cb6eb6785fd is described below commit cb6eb6785fdeb88e66016a2b8c0c6e6fa184b309 Author: Vova Kolmakov <wombatu...@gmail.com> AuthorDate: Tue Apr 23 23:09:08 2024 +0700 [HUDI-7653] Refactor HoodieFileIndex for more flexibility (#11074) Created new abstract class `SparkBaseIndexSupport` with abstract methods `getIndexName`, `isIndexAvailable`, `computeCandidateFileNames` and `invalidateCaches` (to override it in descendants) and concrete methods `getPrunedFileNames`, `getCandidateFiles` and `shouldReadInMemory` (moved from HoodieFileIndex or XXXIndexSupport to reuse it in descendants). --------- Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com> --- .../org/apache/hudi/ColumnStatsIndexSupport.scala | 68 ++++++----- .../org/apache/hudi/FunctionalIndexSupport.scala | 121 +++++-------------- .../scala/org/apache/hudi/HoodieFileIndex.scala | 128 +++++---------------- .../org/apache/hudi/RecordLevelIndexSupport.scala | 48 +++++--- .../org/apache/hudi/SparkBaseIndexSupport.scala | 108 +++++++++++++++++ 5 files changed, 243 insertions(+), 230 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index dc15a3e8c8c..238962b964c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -23,11 +23,10 @@ import org.apache.hudi.ColumnStatsIndexSupport._ import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.model._ -import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.function.SerializableFunction -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.BinaryUtil.toBytes import org.apache.hudi.common.util.ValidationUtils.checkState @@ -36,7 +35,6 @@ import org.apache.hudi.common.util.hash.ColumnIndexID import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType} import org.apache.hudi.util.JFunction -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -44,8 +42,10 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel - import java.nio.ByteBuffer + +import org.apache.spark.sql.catalyst.expressions.Expression + import scala.collection.JavaConverters._ import scala.collection.immutable.TreeSet import scala.collection.mutable.ListBuffer @@ -55,11 +55,8 @@ class ColumnStatsIndexSupport(spark: SparkSession, tableSchema: StructType, @transient metadataConfig: HoodieMetadataConfig, @transient metaClient: HoodieTableMetaClient, - allowCaching: Boolean = false) { - - @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - @transient private lazy val metadataTable: HoodieTableMetadata = - HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString) + allowCaching: Boolean = false) + extends SparkBaseIndexSupport(spark, metadataConfig, metaClient) { @transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap() @@ -79,6 +76,40 @@ class ColumnStatsIndexSupport(spark: SparkSession, } } + override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME + + override def computeCandidateFileNames(fileIndex: HoodieFileIndex, + queryFilters: Seq[Expression], + queryReferencedColumns: Seq[String], + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + shouldPushDownFilesFilter: Boolean + ): Option[Set[String]] = { + if (isIndexAvailable && queryFilters.nonEmpty && queryReferencedColumns.nonEmpty) { + // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead, + // it's most often preferential to fetch Column Stats Index w/in the same process (usually driver), + // w/o resorting to on-cluster execution. + // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or + // on-cluster: total number of rows of the expected projected portion of the index has to be below the + // threshold (of 100k records) + val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold) + val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) + // NOTE: If partition pruning doesn't prune any files, then there's no need to apply file filters + // when loading the Column Statistics Index + val prunedFileNamesOpt = if (shouldPushDownFilesFilter) Some(prunedFileNames) else None + + loadTransposed(queryReferencedColumns, readInMemory, prunedFileNamesOpt) { transposedColStatsDF => + Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames)) + } + } else { + Option.empty + } + } + + override def invalidateCaches(): Unit = { + cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() } + cachedColumnStatsIndexViews.clear() + } + /** * Returns true in cases when Column Stats Index is built and available as standalone partition * w/in the Metadata Table @@ -88,19 +119,6 @@ class ColumnStatsIndexSupport(spark: SparkSession, metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) } - /** - * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process, - * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs - */ - def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = { - Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match { - case Some(mode) => - mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY - case None => - fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold - } - } - /** * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns' * statistics for a single file, returning it as [[DataFrame]] @@ -172,11 +190,6 @@ class ColumnStatsIndexSupport(spark: SparkSession, } } - def invalidateCaches(): Unit = { - cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() } - cachedColumnStatsIndexViews.clear() - } - /** * Transposes and converts the raw table format of the Column Stats Index representation, * where each row/record corresponds to individual (column, file) pair, into the table format @@ -366,6 +379,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, } object ColumnStatsIndexSupport { + val INDEX_NAME = "COLUMN_STATS" private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper", "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala index 035eeabb4d5..8614bcb0b5f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala @@ -19,58 +19,59 @@ package org.apache.hudi -import org.apache.hadoop.fs.FileStatus import org.apache.hudi.FunctionalIndexSupport._ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, HoodieMetadataRecord} -import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.data.HoodieData -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.hash.ColumnIndexID import org.apache.hudi.data.HoodieJavaRDD -import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil} +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil} import org.apache.hudi.util.JFunction -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.functions.col -import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.JavaConverters._ -import scala.collection.{JavaConverters, mutable} class FunctionalIndexSupport(spark: SparkSession, metadataConfig: HoodieMetadataConfig, - metaClient: HoodieTableMetaClient) { - - @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - @transient private lazy val metadataTable: HoodieTableMetadata = - HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString) + metaClient: HoodieTableMetaClient) + extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) { // NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed on to the executor private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold - /** - * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process, - * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs - */ - def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = { - Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match { - case Some(mode) => - mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY - case None => - fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold + override def getIndexName: String = FunctionalIndexSupport.INDEX_NAME + + override def computeCandidateFileNames(fileIndex: HoodieFileIndex, + queryFilters: Seq[Expression], + queryReferencedColumns: Seq[String], + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + shouldPushDownFilesFilter: Boolean + ): Option[Set[String]] = { + lazy val functionalIndexPartitionOpt = getFunctionalIndexPartition(queryFilters) + if (isIndexAvailable && queryFilters.nonEmpty && functionalIndexPartitionOpt.nonEmpty) { + val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, inMemoryProjectionThreshold) + val indexDf = loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, readInMemory) + val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) + Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames)) + } else { + Option.empty } } + override def invalidateCaches(): Unit = { + // no caches for this index type, do nothing + } + /** * Return true if metadata table is enabled and functional index metadata partition is available. */ @@ -78,42 +79,6 @@ class FunctionalIndexSupport(spark: SparkSession, metadataConfig.enabled && metaClient.getFunctionalIndexMetadata.isPresent && !metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.isEmpty } - def getPrunedCandidateFileNames(indexPartition: String, - shouldReadInMemory: Boolean, - queryFilters: Seq[Expression]): Set[String] = { - val indexDf = loadFunctionalIndexDataFrame(indexPartition, shouldReadInMemory) - val indexSchema = indexDf.schema - val indexFilter = - queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)) - .reduce(And) - - val prunedCandidateFileNames = - indexDf.where(new Column(indexFilter)) - .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - - prunedCandidateFileNames - } - - def load(indexPartition: String, - targetColumns: Seq[String], - shouldReadInMemory: Boolean): DataFrame = { - val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString) - // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] - val colStatsDF = spark.read.format("org.apache.hudi") - .options(metadataConfig.getProps.asScala) - .load(s"$metadataTablePath/$indexPartition") - - val requiredIndexColumns = - targetColumnStatsIndexColumns.map(colName => - col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")) - - colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull) - .select(requiredIndexColumns: _*) - } - /** * Searches for an index partition based on the specified index function and target column name. * @@ -127,7 +92,7 @@ class FunctionalIndexSupport(spark: SparkSession, * @return An `Option` containing the index partition identifier if a matching index definition is found. * Returns `None` if no matching index definition is found. */ - def getFunctionalIndexPartition(queryFilters: Seq[Expression]): Option[String] = { + private def getFunctionalIndexPartition(queryFilters: Seq[Expression]): Option[String] = { val functionToColumnNames = extractSparkFunctionNames(queryFilters) if (functionToColumnNames.nonEmpty) { // Currently, only one functional index in the query is supported. HUDI-7620 for supporting multiple functions. @@ -171,8 +136,8 @@ class FunctionalIndexSupport(spark: SparkSession, }.toMap } - def loadFunctionalIndexDataFrame(indexPartition: String, - shouldReadInMemory: Boolean): DataFrame = { + private def loadFunctionalIndexDataFrame(indexPartition: String, + shouldReadInMemory: Boolean): DataFrame = { val colStatsDF = { val indexDefinition = metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.get(indexPartition) val indexType = indexDefinition.getIndexType @@ -224,36 +189,10 @@ class FunctionalIndexSupport(spark: SparkSession, columnStatsRecords } - - /** - * Returns the list of candidate files which store the provided record keys based on Metadata Table Record Index. - * - * @param allFiles - List of all files which needs to be considered for the query - * @param recordKeys - List of record keys. - * @return Sequence of file names which need to be queried - */ - def getCandidateFiles(allFiles: Seq[FileStatus], recordKeys: List[String]): Set[String] = { - val recordKeyLocationsMap = metadataTable.readRecordIndex(seqAsJavaListConverter(recordKeys).asJava) - val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty - val candidateFiles: mutable.Set[String] = mutable.Set.empty - for (locations <- JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala) { - for (location <- JavaConverters.collectionAsScalaIterableConverter(locations).asScala) { - fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath) - } - } - for (file <- allFiles) { - val fileId = FSUtils.getFileIdFromFilePath(file.getPath) - val partitionOpt = fileIdToPartitionMap.get(fileId) - if (partitionOpt.isDefined) { - candidateFiles += file.getPath.getName - } - } - candidateFiles.toSet - } } object FunctionalIndexSupport { - + val INDEX_NAME = "FUNCTIONAL" /** * Target Column Stats Index columns which internally are mapped onto fields of the corresponding * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 922137ff01c..faabbecc5c1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -26,25 +26,23 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.metadata.HoodieMetadataPayload import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} -import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.unsafe.types.UTF8String - import java.text.SimpleDateFormat import java.util.stream.Collectors + import javax.annotation.concurrent.NotThreadSafe + import scala.collection.JavaConverters._ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -96,22 +94,16 @@ case class HoodieFileIndex(spark: SparkSession, @transient protected var hasPushedDownPartitionPredicates: Boolean = false /** - * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only relevant while logical plan + * NOTE: [[indicesSupport]] is a transient state, since it's only relevant while logical plan * is handled by the Spark's driver + * The order of elements is important as in this order indices will be applied + * during `lookupCandidateFilesInMetadataTable` */ - @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) - - /** - * NOTE: [[RecordLevelIndexSupport]] is a transient state, since it's only relevant while logical plan - * is handled by the Spark's driver - */ - @transient private lazy val recordLevelIndex = new RecordLevelIndexSupport(spark, metadataConfig, metaClient) - - /** - * NOTE: [[FunctionalIndexSupport]] is a transient state, since it's only relevant while logical plan - * is handled by the Spark's driver - */ - @transient private lazy val functionalIndex = new FunctionalIndexSupport(spark, metadataConfig, metaClient) + @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = List( + new RecordLevelIndexSupport(spark, metadataConfig, metaClient), + new FunctionalIndexSupport(spark, metadataConfig, metaClient), + new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) + ) private val enableHoodieExtension = spark.sessionState.conf.getConfString("spark.sql.extensions", "") .split(",") @@ -335,6 +327,7 @@ case class HoodieFileIndex(spark: SparkSession, * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files and log files' names */ + // scalastyle:off return private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression], prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], shouldPushDownFilesFilter: Boolean): Try[Option[Set[String]]] = Try { @@ -349,85 +342,24 @@ case class HoodieFileIndex(spark: SparkSession, // and candidate files are obtained from these file slices. lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) - lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters) - lazy val functionalIndexPartitionOpt = functionalIndex.getFunctionalIndexPartition(queryFilters) - if (!isMetadataTableEnabled || !isDataSkippingEnabled) { - validateConfig() - Option.empty - } else if (recordKeys.nonEmpty) { - Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) - } else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty && functionalIndexPartitionOpt.nonEmpty) { - val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns) - val indexDf = functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, shouldReadInMemory) - val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) - Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames)) - } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || queryReferencedColumns.isEmpty) { - validateConfig() - Option.empty - } else { - // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead, - // it's most often preferential to fetch Column Stats Index w/in the same process (usually driver), - // w/o resorting to on-cluster execution. - // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or - // on-cluster: total number of rows of the expected projected portion of the index has to be below the - // threshold (of 100k records) - val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns) - val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) - // NOTE: If partition pruning doesn't prune any files, then there's no need to apply file filters - // when loading the Column Statistics Index - val prunedFileNamesOpt = if (shouldPushDownFilesFilter) Some(prunedFileNames) else None - - columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory, prunedFileNamesOpt) { transposedColStatsDF => - Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames)) + if (isMetadataTableEnabled && isDataSkippingEnabled) { + for(indexSupport: SparkBaseIndexSupport <- indicesSupport) { + if (indexSupport.isIndexAvailable) { + val prunedFileNames = indexSupport.computeCandidateFileNames(this, queryFilters, queryReferencedColumns, + prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) + if (prunedFileNames.nonEmpty) { + return Try(prunedFileNames) + } + } } } - } - - private def getPrunedFileNames(prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): Set[String] = { - prunedPartitionsAndFileSlices - .flatMap { - case (_, fileSlices) => fileSlices - } - .flatMap { fileSlice => - val baseFileOption = Option(fileSlice.getBaseFile.orElse(null)) - val logFiles = if (includeLogFiles) { - fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList - } else Nil - baseFileOption.map(_.getFileName).toList ++ logFiles - } - .toSet - } - - private def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], prunedFileNames: Set[String]): Set[String] = { - val indexSchema = indexDf.schema - val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)).reduce(And) - val prunedCandidateFileNames = - indexDf.where(new Column(indexFilter)) - .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - - // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every - // base-file or log file: since it's bound to clustering, which could occur asynchronously - // at arbitrary point in time, and is not likely to be touching all of the base files. - // - // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) - // files and all outstanding base-files or log files, and make sure that all base files and - // log file not represented w/in the index are included in the output of this method - val allIndexedFileNames = - indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - val notIndexedFileNames = prunedFileNames -- allIndexedFileNames - - prunedCandidateFileNames ++ notIndexedFileNames + validateConfig() + Option.empty } override def refresh(): Unit = { super.refresh() - columnStatsIndex.invalidateCaches() + indicesSupport.foreach(idx => idx.invalidateCaches()) hasPushedDownPartitionPredicates = false } @@ -460,17 +392,19 @@ case class HoodieFileIndex(spark: SparkSession, private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled - private def isRecordIndexEnabled: Boolean = recordLevelIndex.isIndexAvailable + private def isRecordIndexEnabled: Boolean = indicesSupport.exists(idx => + idx.getIndexName == RecordLevelIndexSupport.INDEX_NAME && idx.isIndexAvailable) - private def isFunctionalIndexEnabled: Boolean = functionalIndex.isIndexAvailable + private def isFunctionalIndexEnabled: Boolean = indicesSupport.exists(idx => + idx.getIndexName == FunctionalIndexSupport.INDEX_NAME && idx.isIndexAvailable) - private def isIndexEnabled: Boolean = isColumnStatsIndexEnabled || isRecordIndexEnabled || isFunctionalIndexEnabled + private def isIndexEnabled: Boolean = indicesSupport.exists(idx => idx.isIndexAvailable) private def validateConfig(): Unit = { if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled)) { logWarning("Data skipping requires both Metadata Table and at least one of Column Stats Index, Record Level Index, or Functional Index" + " to be enabled as well! " + s"(isMetadataTableEnabled = $isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled" - + s", isRecordIndexApplicable = $isRecordIndexEnabled)") + + s", isRecordIndexApplicable = $isRecordIndexEnabled, isFunctionalIndexEnabled = $isFunctionalIndexEnabled)") } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala index 7d7d88cb2d4..119e3318902 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala @@ -17,28 +17,45 @@ package org.apache.hudi -import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.metadata.{HoodieTableMetadata, HoodieTableMetadataUtil} -import org.apache.hudi.storage.StoragePathInfo +import org.apache.hudi.metadata.HoodieTableMetadataUtil +import org.apache.hudi.storage.StoragePath import org.apache.hudi.util.JFunction - -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, In, Literal} -import scala.collection.{mutable, JavaConverters} +import scala.collection.{JavaConverters, mutable} -class RecordLevelIndexSupport(spark: SparkSession, +class RecordLevelIndexSupport (spark: SparkSession, metadataConfig: HoodieMetadataConfig, - metaClient: HoodieTableMetaClient) { + metaClient: HoodieTableMetaClient) + extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) { + + + override def getIndexName: String = RecordLevelIndexSupport.INDEX_NAME + + override def computeCandidateFileNames(fileIndex: HoodieFileIndex, + queryFilters: Seq[Expression], + queryReferencedColumns: Seq[String], + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + shouldPushDownFilesFilter: Boolean + ): Option[Set[String]] = { + lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters) + val allFiles = fileIndex.inputFiles.map(strPath => new StoragePath(strPath)).toSeq + if (recordKeys.nonEmpty) { + Option.apply(getCandidateFiles(allFiles, recordKeys)) + } else { + Option.empty + } + } - @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - @transient private lazy val metadataTable: HoodieTableMetadata = - HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString) + override def invalidateCaches(): Unit = { + // no caches for this index type, do nothing + } /** * Returns the list of candidate files which store the provided record keys based on Metadata Table Record Index. @@ -47,7 +64,7 @@ class RecordLevelIndexSupport(spark: SparkSession, * @param recordKeys - List of record keys. * @return Sequence of file names which need to be queried */ - def getCandidateFiles(allFiles: Seq[StoragePathInfo], recordKeys: List[String]): Set[String] = { + def getCandidateFiles(allFiles: Seq[StoragePath], recordKeys: List[String]): Set[String] = { val recordKeyLocationsMap = metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava) val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty val candidateFiles: mutable.Set[String] = mutable.Set.empty @@ -57,10 +74,10 @@ class RecordLevelIndexSupport(spark: SparkSession, } } for (file <- allFiles) { - val fileId = FSUtils.getFileIdFromFilePath(file.getPath) + val fileId = FSUtils.getFileIdFromFilePath(file) val partitionOpt = fileIdToPartitionMap.get(fileId) if (partitionOpt.isDefined) { - candidateFiles += file.getPath.getName + candidateFiles += file.getName } } candidateFiles.toSet @@ -87,7 +104,7 @@ class RecordLevelIndexSupport(spark: SparkSession, * @param queryFilters The queries that need to be filtered. * @return Tuple of List of filtered queries and list of record key literals that need to be matched */ - def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): (List[Expression], List[String]) = { + private def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): (List[Expression], List[String]) = { if (!isIndexAvailable) { (List.empty, List.empty) } else { @@ -115,6 +132,7 @@ class RecordLevelIndexSupport(spark: SparkSession, } object RecordLevelIndexSupport { + val INDEX_NAME = "RECORD_LEVEL" /** * If the input query is an EqualTo or IN query on simple record key columns, the function returns a tuple of * list of the query and list of record key literals present in the query otherwise returns an empty option. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala new file mode 100644 index 00000000000..d82ed6ca6d5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr + +import scala.collection.JavaConverters._ + +abstract class SparkBaseIndexSupport(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient) { + @transient protected lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + @transient protected lazy val metadataTable: HoodieTableMetadata = + HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString) + + def getIndexName: String + + def isIndexAvailable: Boolean + + def computeCandidateFileNames(fileIndex: HoodieFileIndex, + queryFilters: Seq[Expression], + queryReferencedColumns: Seq[String], + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + shouldPushDownFilesFilter: Boolean): Option[Set[String]] + + def invalidateCaches(): Unit + + protected def getPrunedFileNames(prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + includeLogFiles: Boolean = false): Set[String] = { + prunedPartitionsAndFileSlices + .flatMap { + case (_, fileSlices) => fileSlices + } + .flatMap { fileSlice => + val baseFileOption = Option(fileSlice.getBaseFile.orElse(null)) + val logFiles = if (includeLogFiles) { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList + } else Nil + baseFileOption.map(_.getFileName).toList ++ logFiles + } + .toSet + } + + protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], prunedFileNames: Set[String]): Set[String] = { + val indexSchema = indexDf.schema + val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)).reduce(And) + val prunedCandidateFileNames = + indexDf.where(new Column(indexFilter)) + .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every + // base-file or log file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to be touching all of the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) + // files and all outstanding base-files or log files, and make sure that all base files and + // log file not represented w/in the index are included in the output of this method + val allIndexedFileNames = + indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + val notIndexedFileNames = prunedFileNames -- allIndexedFileNames + + prunedCandidateFileNames ++ notIndexedFileNames + } + + /** + * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process, + * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs + */ + protected def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String], inMemoryProjectionThreshold: Integer): Boolean = { + Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match { + case Some(mode) => + mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY + case None => + fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold + } + } + +}