This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cb6eb6785fd [HUDI-7653] Refactor HoodieFileIndex for more flexibility
(#11074)
cb6eb6785fd is described below
commit cb6eb6785fdeb88e66016a2b8c0c6e6fa184b309
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Apr 23 23:09:08 2024 +0700
[HUDI-7653] Refactor HoodieFileIndex for more flexibility (#11074)
Created new abstract class `SparkBaseIndexSupport` with abstract methods
`getIndexName`, `isIndexAvailable`, `computeCandidateFileNames` and
`invalidateCaches` (to override it in descendants) and concrete methods
`getPrunedFileNames`, `getCandidateFiles` and `shouldReadInMemory`
(moved from HoodieFileIndex or XXXIndexSupport to reuse it in descendants).
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 68 ++++++-----
.../org/apache/hudi/FunctionalIndexSupport.scala | 121 +++++--------------
.../scala/org/apache/hudi/HoodieFileIndex.scala | 128 +++++----------------
.../org/apache/hudi/RecordLevelIndexSupport.scala | 48 +++++---
.../org/apache/hudi/SparkBaseIndexSupport.scala | 108 +++++++++++++++++
5 files changed, 243 insertions(+), 230 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index dc15a3e8c8c..238962b964c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -23,11 +23,10 @@ import org.apache.hudi.ColumnStatsIndexSupport._
import org.apache.hudi.HoodieCatalystUtils.{withPersistedData,
withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model._
-import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.function.SerializableFunction
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.BinaryUtil.toBytes
import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -36,7 +35,6 @@ import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
-import org.apache.spark.api.java.JavaSparkContext
import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -44,8 +42,10 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
-
import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
import scala.collection.mutable.ListBuffer
@@ -55,11 +55,8 @@ class ColumnStatsIndexSupport(spark: SparkSession,
tableSchema: StructType,
@transient metadataConfig: HoodieMetadataConfig,
@transient metaClient: HoodieTableMetaClient,
- allowCaching: Boolean = false) {
-
- @transient private lazy val engineCtx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
- @transient private lazy val metadataTable: HoodieTableMetadata =
- HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString)
+ allowCaching: Boolean = false)
+ extends SparkBaseIndexSupport(spark, metadataConfig, metaClient) {
@transient private lazy val cachedColumnStatsIndexViews:
ParHashMap[Seq[String], DataFrame] = ParHashMap()
@@ -79,6 +76,40 @@ class ColumnStatsIndexSupport(spark: SparkSession,
}
}
+ override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
+ // NOTE: Since executing on-cluster via Spark API has its own
non-trivial amount of overhead,
+ // it's most often preferential to fetch Column Stats Index w/in
the same process (usually driver),
+ // w/o resorting to on-cluster execution.
+ // For that we use a simple-heuristic to determine whether we
should read and process CSI in-memory or
+ // on-cluster: total number of rows of the expected projected
portion of the index has to be below the
+ // threshold (of 100k records)
+ val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
+ val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+ // NOTE: If partition pruning doesn't prune any files, then there's no
need to apply file filters
+ // when loading the Column Statistics Index
+ val prunedFileNamesOpt = if (shouldPushDownFilesFilter)
Some(prunedFileNames) else None
+
+ loadTransposed(queryReferencedColumns, readInMemory, prunedFileNamesOpt)
{ transposedColStatsDF =>
+ Some(getCandidateFiles(transposedColStatsDF, queryFilters,
prunedFileNames))
+ }
+ } else {
+ Option.empty
+ }
+ }
+
+ override def invalidateCaches(): Unit = {
+ cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+ cachedColumnStatsIndexViews.clear()
+ }
+
/**
* Returns true in cases when Column Stats Index is built and available as
standalone partition
* w/in the Metadata Table
@@ -88,19 +119,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
}
- /**
- * Determines whether it would be more optimal to read Column Stats Index a)
in-memory of the invoking process,
- * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
- */
- def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns:
Seq[String]): Boolean = {
- Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
- case Some(mode) =>
- mode ==
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
- case None =>
- fileIndex.getFileSlicesCount * queryReferencedColumns.length <
inMemoryProjectionThreshold
- }
- }
-
/**
* Loads view of the Column Stats Index in a transposed format where single
row coalesces every columns'
* statistics for a single file, returning it as [[DataFrame]]
@@ -172,11 +190,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
}
}
- def invalidateCaches(): Unit = {
- cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
- cachedColumnStatsIndexViews.clear()
- }
-
/**
* Transposes and converts the raw table format of the Column Stats Index
representation,
* where each row/record corresponds to individual (column, file) pair, into
the table format
@@ -366,6 +379,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
}
object ColumnStatsIndexSupport {
+ val INDEX_NAME = "COLUMN_STATS"
private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper",
"LongWrapper", "FloatWrapper", "DoubleWrapper",
"BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper",
"TimeMicrosWrapper", "TimestampMicrosWrapper")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 035eeabb4d5..8614bcb0b5f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -19,58 +19,59 @@
package org.apache.hudi
-import org.apache.hadoop.fs.FileStatus
import org.apache.hudi.FunctionalIndexSupport._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
-import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata,
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
import org.apache.hudi.util.JFunction
-import org.apache.spark.api.java.JavaSparkContext
import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.functions.col
-import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._
-import scala.collection.{JavaConverters, mutable}
class FunctionalIndexSupport(spark: SparkSession,
metadataConfig: HoodieMetadataConfig,
- metaClient: HoodieTableMetaClient) {
-
- @transient private lazy val engineCtx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
- @transient private lazy val metadataTable: HoodieTableMetadata =
- HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString)
+ metaClient: HoodieTableMetaClient)
+ extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly
persisted, before this will be passed on to the executor
private val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
- /**
- * Determines whether it would be more optimal to read Column Stats Index a)
in-memory of the invoking process,
- * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
- */
- def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns:
Seq[String]): Boolean = {
- Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
- case Some(mode) =>
- mode ==
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
- case None =>
- fileIndex.getFileSlicesCount * queryReferencedColumns.length <
inMemoryProjectionThreshold
+ override def getIndexName: String = FunctionalIndexSupport.INDEX_NAME
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ lazy val functionalIndexPartitionOpt =
getFunctionalIndexPartition(queryFilters)
+ if (isIndexAvailable && queryFilters.nonEmpty &&
functionalIndexPartitionOpt.nonEmpty) {
+ val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
+ val indexDf =
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, readInMemory)
+ val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+ Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ } else {
+ Option.empty
}
}
+ override def invalidateCaches(): Unit = {
+ // no caches for this index type, do nothing
+ }
+
/**
* Return true if metadata table is enabled and functional index metadata
partition is available.
*/
@@ -78,42 +79,6 @@ class FunctionalIndexSupport(spark: SparkSession,
metadataConfig.enabled && metaClient.getFunctionalIndexMetadata.isPresent
&& !metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.isEmpty
}
- def getPrunedCandidateFileNames(indexPartition: String,
- shouldReadInMemory: Boolean,
- queryFilters: Seq[Expression]): Set[String]
= {
- val indexDf = loadFunctionalIndexDataFrame(indexPartition,
shouldReadInMemory)
- val indexSchema = indexDf.schema
- val indexFilter =
- queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
- .reduce(And)
-
- val prunedCandidateFileNames =
- indexDf.where(new Column(indexFilter))
- .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- prunedCandidateFileNames
- }
-
- def load(indexPartition: String,
- targetColumns: Seq[String],
- shouldReadInMemory: Boolean): DataFrame = {
- val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
- // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
- val colStatsDF = spark.read.format("org.apache.hudi")
- .options(metadataConfig.getProps.asScala)
- .load(s"$metadataTablePath/$indexPartition")
-
- val requiredIndexColumns =
- targetColumnStatsIndexColumns.map(colName =>
-
col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
-
-
colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
- .select(requiredIndexColumns: _*)
- }
-
/**
* Searches for an index partition based on the specified index function and
target column name.
*
@@ -127,7 +92,7 @@ class FunctionalIndexSupport(spark: SparkSession,
* @return An `Option` containing the index partition identifier if a
matching index definition is found.
* Returns `None` if no matching index definition is found.
*/
- def getFunctionalIndexPartition(queryFilters: Seq[Expression]):
Option[String] = {
+ private def getFunctionalIndexPartition(queryFilters: Seq[Expression]):
Option[String] = {
val functionToColumnNames = extractSparkFunctionNames(queryFilters)
if (functionToColumnNames.nonEmpty) {
// Currently, only one functional index in the query is supported.
HUDI-7620 for supporting multiple functions.
@@ -171,8 +136,8 @@ class FunctionalIndexSupport(spark: SparkSession,
}.toMap
}
- def loadFunctionalIndexDataFrame(indexPartition: String,
- shouldReadInMemory: Boolean): DataFrame = {
+ private def loadFunctionalIndexDataFrame(indexPartition: String,
+ shouldReadInMemory: Boolean):
DataFrame = {
val colStatsDF = {
val indexDefinition =
metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.get(indexPartition)
val indexType = indexDefinition.getIndexType
@@ -224,36 +189,10 @@ class FunctionalIndexSupport(spark: SparkSession,
columnStatsRecords
}
-
- /**
- * Returns the list of candidate files which store the provided record keys
based on Metadata Table Record Index.
- *
- * @param allFiles - List of all files which needs to be considered for
the query
- * @param recordKeys - List of record keys.
- * @return Sequence of file names which need to be queried
- */
- def getCandidateFiles(allFiles: Seq[FileStatus], recordKeys: List[String]):
Set[String] = {
- val recordKeyLocationsMap =
metadataTable.readRecordIndex(seqAsJavaListConverter(recordKeys).asJava)
- val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
- val candidateFiles: mutable.Set[String] = mutable.Set.empty
- for (locations <-
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
{
- for (location <-
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
- fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
- }
- }
- for (file <- allFiles) {
- val fileId = FSUtils.getFileIdFromFilePath(file.getPath)
- val partitionOpt = fileIdToPartitionMap.get(fileId)
- if (partitionOpt.isDefined) {
- candidateFiles += file.getPath.getName
- }
- }
- candidateFiles.toSet
- }
}
object FunctionalIndexSupport {
-
+ val INDEX_NAME = "FUNCTIONAL"
/**
* Target Column Stats Index columns which internally are mapped onto fields
of the corresponding
* Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted
w/in Metadata Table
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 922137ff01c..faabbecc5c1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -26,25 +26,23 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction
-
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache,
NoopCache, PartitionDirectory}
-import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.unsafe.types.UTF8String
-
import java.text.SimpleDateFormat
import java.util.stream.Collectors
+
import javax.annotation.concurrent.NotThreadSafe
+
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
@@ -96,22 +94,16 @@ case class HoodieFileIndex(spark: SparkSession,
@transient protected var hasPushedDownPartitionPredicates: Boolean = false
/**
- * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only
relevant while logical plan
+ * NOTE: [[indicesSupport]] is a transient state, since it's only relevant
while logical plan
* is handled by the Spark's driver
+ * The order of elements is important as in this order indices will be
applied
+ * during `lookupCandidateFilesInMetadataTable`
*/
- @transient private lazy val columnStatsIndex = new
ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
-
- /**
- * NOTE: [[RecordLevelIndexSupport]] is a transient state, since it's only
relevant while logical plan
- * is handled by the Spark's driver
- */
- @transient private lazy val recordLevelIndex = new
RecordLevelIndexSupport(spark, metadataConfig, metaClient)
-
- /**
- * NOTE: [[FunctionalIndexSupport]] is a transient state, since it's only
relevant while logical plan
- * is handled by the Spark's driver
- */
- @transient private lazy val functionalIndex = new
FunctionalIndexSupport(spark, metadataConfig, metaClient)
+ @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] =
List(
+ new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
+ new FunctionalIndexSupport(spark, metadataConfig, metaClient),
+ new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
+ )
private val enableHoodieExtension =
spark.sessionState.conf.getConfString("spark.sql.extensions", "")
.split(",")
@@ -335,6 +327,7 @@ case class HoodieFileIndex(spark: SparkSession,
* @param queryFilters list of original data filters passed down from
querying engine
* @return list of pruned (data-skipped) candidate base-files and log files'
names
*/
+ // scalastyle:off return
private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression],
prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
shouldPushDownFilesFilter:
Boolean): Try[Option[Set[String]]] = Try {
@@ -349,85 +342,24 @@ case class HoodieFileIndex(spark: SparkSession,
// and candidate files are obtained from these file slices.
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
- lazy val (_, recordKeys) =
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
- lazy val functionalIndexPartitionOpt =
functionalIndex.getFunctionalIndexPartition(queryFilters)
- if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
- validateConfig()
- Option.empty
- } else if (recordKeys.nonEmpty) {
- Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(),
recordKeys))
- } else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty &&
functionalIndexPartitionOpt.nonEmpty) {
- val shouldReadInMemory = functionalIndex.shouldReadInMemory(this,
queryReferencedColumns)
- val indexDf =
functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get,
shouldReadInMemory)
- val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
- Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
- } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty ||
queryReferencedColumns.isEmpty) {
- validateConfig()
- Option.empty
- } else {
- // NOTE: Since executing on-cluster via Spark API has its own
non-trivial amount of overhead,
- // it's most often preferential to fetch Column Stats Index w/in
the same process (usually driver),
- // w/o resorting to on-cluster execution.
- // For that we use a simple-heuristic to determine whether we
should read and process CSI in-memory or
- // on-cluster: total number of rows of the expected projected
portion of the index has to be below the
- // threshold (of 100k records)
- val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this,
queryReferencedColumns)
- val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
- // NOTE: If partition pruning doesn't prune any files, then there's no
need to apply file filters
- // when loading the Column Statistics Index
- val prunedFileNamesOpt = if (shouldPushDownFilesFilter)
Some(prunedFileNames) else None
-
- columnStatsIndex.loadTransposed(queryReferencedColumns,
shouldReadInMemory, prunedFileNamesOpt) { transposedColStatsDF =>
- Some(getCandidateFiles(transposedColStatsDF, queryFilters,
prunedFileNames))
+ if (isMetadataTableEnabled && isDataSkippingEnabled) {
+ for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
+ if (indexSupport.isIndexAvailable) {
+ val prunedFileNames = indexSupport.computeCandidateFileNames(this,
queryFilters, queryReferencedColumns,
+ prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
+ if (prunedFileNames.nonEmpty) {
+ return Try(prunedFileNames)
+ }
+ }
}
}
- }
-
- private def getPrunedFileNames(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]):
Set[String] = {
- prunedPartitionsAndFileSlices
- .flatMap {
- case (_, fileSlices) => fileSlices
- }
- .flatMap { fileSlice =>
- val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
- val logFiles = if (includeLogFiles) {
- fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
- } else Nil
- baseFileOption.map(_.getFileName).toList ++ logFiles
- }
- .toSet
- }
-
- private def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
- val indexSchema = indexDf.schema
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
- val prunedCandidateFileNames =
- indexDf.where(new Column(indexFilter))
- .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
- // base-file or log file: since it's bound to clustering, which
could occur asynchronously
- // at arbitrary point in time, and is not likely to be touching all
of the base files.
- //
- // To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
- // files and all outstanding base-files or log files, and make sure
that all base files and
- // log file not represented w/in the index are included in the
output of this method
- val allIndexedFileNames =
- indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
- val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
-
- prunedCandidateFileNames ++ notIndexedFileNames
+ validateConfig()
+ Option.empty
}
override def refresh(): Unit = {
super.refresh()
- columnStatsIndex.invalidateCaches()
+ indicesSupport.foreach(idx => idx.invalidateCaches())
hasPushedDownPartitionPredicates = false
}
@@ -460,17 +392,19 @@ case class HoodieFileIndex(spark: SparkSession,
private def isColumnStatsIndexEnabled: Boolean =
metadataConfig.isColumnStatsIndexEnabled
- private def isRecordIndexEnabled: Boolean = recordLevelIndex.isIndexAvailable
+ private def isRecordIndexEnabled: Boolean = indicesSupport.exists(idx =>
+ idx.getIndexName == RecordLevelIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
- private def isFunctionalIndexEnabled: Boolean =
functionalIndex.isIndexAvailable
+ private def isFunctionalIndexEnabled: Boolean = indicesSupport.exists(idx =>
+ idx.getIndexName == FunctionalIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
- private def isIndexEnabled: Boolean = isColumnStatsIndexEnabled ||
isRecordIndexEnabled || isFunctionalIndexEnabled
+ private def isIndexEnabled: Boolean = indicesSupport.exists(idx =>
idx.isIndexAvailable)
private def validateConfig(): Unit = {
if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled))
{
logWarning("Data skipping requires both Metadata Table and at least one
of Column Stats Index, Record Level Index, or Functional Index" +
" to be enabled as well! " + s"(isMetadataTableEnabled =
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
- + s", isRecordIndexApplicable = $isRecordIndexEnabled)")
+ + s", isRecordIndexApplicable = $isRecordIndexEnabled,
isFunctionalIndexEnabled = $isFunctionalIndexEnabled)")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 7d7d88cb2d4..119e3318902 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -17,28 +17,45 @@
package org.apache.hudi
-import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.metadata.{HoodieTableMetadata, HoodieTableMetadataUtil}
-import org.apache.hudi.storage.StoragePathInfo
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.hudi.storage.StoragePath
import org.apache.hudi.util.JFunction
-
-import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, In, Literal}
-import scala.collection.{mutable, JavaConverters}
+import scala.collection.{JavaConverters, mutable}
-class RecordLevelIndexSupport(spark: SparkSession,
+class RecordLevelIndexSupport (spark: SparkSession,
metadataConfig: HoodieMetadataConfig,
- metaClient: HoodieTableMetaClient) {
+ metaClient: HoodieTableMetaClient)
+ extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
+
+
+ override def getIndexName: String = RecordLevelIndexSupport.INDEX_NAME
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
+ val allFiles = fileIndex.inputFiles.map(strPath => new
StoragePath(strPath)).toSeq
+ if (recordKeys.nonEmpty) {
+ Option.apply(getCandidateFiles(allFiles, recordKeys))
+ } else {
+ Option.empty
+ }
+ }
- @transient private lazy val engineCtx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
- @transient private lazy val metadataTable: HoodieTableMetadata =
- HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString)
+ override def invalidateCaches(): Unit = {
+ // no caches for this index type, do nothing
+ }
/**
* Returns the list of candidate files which store the provided record keys
based on Metadata Table Record Index.
@@ -47,7 +64,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
* @param recordKeys - List of record keys.
* @return Sequence of file names which need to be queried
*/
- def getCandidateFiles(allFiles: Seq[StoragePathInfo], recordKeys:
List[String]): Set[String] = {
+ def getCandidateFiles(allFiles: Seq[StoragePath], recordKeys: List[String]):
Set[String] = {
val recordKeyLocationsMap =
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -57,10 +74,10 @@ class RecordLevelIndexSupport(spark: SparkSession,
}
}
for (file <- allFiles) {
- val fileId = FSUtils.getFileIdFromFilePath(file.getPath)
+ val fileId = FSUtils.getFileIdFromFilePath(file)
val partitionOpt = fileIdToPartitionMap.get(fileId)
if (partitionOpt.isDefined) {
- candidateFiles += file.getPath.getName
+ candidateFiles += file.getName
}
}
candidateFiles.toSet
@@ -87,7 +104,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
* @param queryFilters The queries that need to be filtered.
* @return Tuple of List of filtered queries and list of record key literals
that need to be matched
*/
- def filterQueriesWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
+ private def filterQueriesWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
if (!isIndexAvailable) {
(List.empty, List.empty)
} else {
@@ -115,6 +132,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
}
object RecordLevelIndexSupport {
+ val INDEX_NAME = "RECORD_LEVEL"
/**
* If the input query is an EqualTo or IN query on simple record key
columns, the function returns a tuple of
* list of the query and list of record key literals present in the query
otherwise returns an empty option.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
new file mode 100644
index 00000000000..d82ed6ca6d5
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
+
+import scala.collection.JavaConverters._
+
+abstract class SparkBaseIndexSupport(spark: SparkSession,
+ metadataConfig: HoodieMetadataConfig,
+ metaClient: HoodieTableMetaClient) {
+ @transient protected lazy val engineCtx = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
+ @transient protected lazy val metadataTable: HoodieTableMetadata =
+ HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString)
+
+ def getIndexName: String
+
+ def isIndexAvailable: Boolean
+
+ def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean):
Option[Set[String]]
+
+ def invalidateCaches(): Unit
+
+ protected def getPrunedFileNames(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ includeLogFiles: Boolean = false):
Set[String] = {
+ prunedPartitionsAndFileSlices
+ .flatMap {
+ case (_, fileSlices) => fileSlices
+ }
+ .flatMap { fileSlice =>
+ val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+ val logFiles = if (includeLogFiles) {
+ fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
+ } else Nil
+ baseFileOption.map(_.getFileName).toList ++ logFiles
+ }
+ .toSet
+ }
+
+ protected def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
+ val indexSchema = indexDf.schema
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
+ val prunedCandidateFileNames =
+ indexDf.where(new Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+
+ // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
+ // base-file or log file: since it's bound to clustering, which
could occur asynchronously
+ // at arbitrary point in time, and is not likely to be touching all
of the base files.
+ //
+ // To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
+ // files and all outstanding base-files or log files, and make sure
that all base files and
+ // log file not represented w/in the index are included in the
output of this method
+ val allIndexedFileNames =
+ indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
+
+ prunedCandidateFileNames ++ notIndexedFileNames
+ }
+
+ /**
+ * Determines whether it would be more optimal to read Column Stats Index a)
in-memory of the invoking process,
+ * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
+ */
+ protected def shouldReadInMemory(fileIndex: HoodieFileIndex,
queryReferencedColumns: Seq[String], inMemoryProjectionThreshold: Integer):
Boolean = {
+ Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
+ case Some(mode) =>
+ mode ==
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
+ case None =>
+ fileIndex.getFileSlicesCount * queryReferencedColumns.length <
inMemoryProjectionThreshold
+ }
+ }
+
+}