This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cb6eb6785fd [HUDI-7653] Refactor HoodieFileIndex for more flexibility 
(#11074)
cb6eb6785fd is described below

commit cb6eb6785fdeb88e66016a2b8c0c6e6fa184b309
Author: Vova Kolmakov <wombatu...@gmail.com>
AuthorDate: Tue Apr 23 23:09:08 2024 +0700

    [HUDI-7653] Refactor HoodieFileIndex for more flexibility (#11074)
    
    Created new abstract class `SparkBaseIndexSupport` with abstract methods
    `getIndexName`, `isIndexAvailable`, `computeCandidateFileNames` and
    `invalidateCaches` (to override it in descendants) and concrete methods
    `getPrunedFileNames`, `getCandidateFiles` and `shouldReadInMemory`
    (moved from HoodieFileIndex or XXXIndexSupport to reuse it in descendants).
    
    ---------
    
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  68 ++++++-----
 .../org/apache/hudi/FunctionalIndexSupport.scala   | 121 +++++--------------
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 128 +++++----------------
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |  48 +++++---
 .../org/apache/hudi/SparkBaseIndexSupport.scala    | 108 +++++++++++++++++
 5 files changed, 243 insertions(+), 230 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index dc15a3e8c8c..238962b964c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -23,11 +23,10 @@ import org.apache.hudi.ColumnStatsIndexSupport._
 import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, 
withPersistedDataset}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.model._
-import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
 import org.apache.hudi.common.function.SerializableFunction
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.BinaryUtil.toBytes
 import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -36,7 +35,6 @@ import org.apache.hudi.common.util.hash.ColumnIndexID
 import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.util.JFunction
-import org.apache.spark.api.java.JavaSparkContext
 import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD, createDataFrameFromRows}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -44,8 +42,10 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
-
 import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
 import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeSet
 import scala.collection.mutable.ListBuffer
@@ -55,11 +55,8 @@ class ColumnStatsIndexSupport(spark: SparkSession,
                               tableSchema: StructType,
                               @transient metadataConfig: HoodieMetadataConfig,
                               @transient metaClient: HoodieTableMetaClient,
-                              allowCaching: Boolean = false) {
-
-  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
-  @transient private lazy val metadataTable: HoodieTableMetadata =
-    HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
+                              allowCaching: Boolean = false)
+  extends SparkBaseIndexSupport(spark, metadataConfig, metaClient) {
 
   @transient private lazy val cachedColumnStatsIndexViews: 
ParHashMap[Seq[String], DataFrame] = ParHashMap()
 
@@ -79,6 +76,40 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     }
   }
 
+  override def getIndexName: String = ColumnStatsIndexSupport.INDEX_NAME
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
+      // NOTE: Since executing on-cluster via Spark API has its own 
non-trivial amount of overhead,
+      //       it's most often preferential to fetch Column Stats Index w/in 
the same process (usually driver),
+      //       w/o resorting to on-cluster execution.
+      //       For that we use a simple-heuristic to determine whether we 
should read and process CSI in-memory or
+      //       on-cluster: total number of rows of the expected projected 
portion of the index has to be below the
+      //       threshold (of 100k records)
+      val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+      // NOTE: If partition pruning doesn't prune any files, then there's no 
need to apply file filters
+      //       when loading the Column Statistics Index
+      val prunedFileNamesOpt = if (shouldPushDownFilesFilter) 
Some(prunedFileNames) else None
+
+      loadTransposed(queryReferencedColumns, readInMemory, prunedFileNamesOpt) 
{ transposedColStatsDF =>
+        Some(getCandidateFiles(transposedColStatsDF, queryFilters, 
prunedFileNames))
+      }
+    } else {
+      Option.empty
+    }
+  }
+
+  override def invalidateCaches(): Unit = {
+    cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+    cachedColumnStatsIndexViews.clear()
+  }
+
   /**
    * Returns true in cases when Column Stats Index is built and available as 
standalone partition
    * w/in the Metadata Table
@@ -88,19 +119,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
   }
 
-  /**
-   * Determines whether it would be more optimal to read Column Stats Index a) 
in-memory of the invoking process,
-   * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
-   */
-  def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: 
Seq[String]): Boolean = {
-    Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
-      case Some(mode) =>
-        mode == 
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
-      case None =>
-        fileIndex.getFileSlicesCount * queryReferencedColumns.length < 
inMemoryProjectionThreshold
-    }
-  }
-
   /**
    * Loads view of the Column Stats Index in a transposed format where single 
row coalesces every columns'
    * statistics for a single file, returning it as [[DataFrame]]
@@ -172,11 +190,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     }
   }
 
-  def invalidateCaches(): Unit = {
-    cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
-    cachedColumnStatsIndexViews.clear()
-  }
-
   /**
    * Transposes and converts the raw table format of the Column Stats Index 
representation,
    * where each row/record corresponds to individual (column, file) pair, into 
the table format
@@ -366,6 +379,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
 }
 
 object ColumnStatsIndexSupport {
+  val INDEX_NAME = "COLUMN_STATS"
 
   private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", 
"LongWrapper", "FloatWrapper", "DoubleWrapper",
     "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", 
"TimeMicrosWrapper", "TimestampMicrosWrapper")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 035eeabb4d5..8614bcb0b5f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -19,58 +19,59 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.fs.FileStatus
 import org.apache.hudi.FunctionalIndexSupport._
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieSparkFunctionalIndex.SPARK_FUNCTION_MAP
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
-import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.ColumnIndexID
 import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil}
 import org.apache.hudi.util.JFunction
-import org.apache.spark.api.java.JavaSparkContext
 import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.functions.col
-import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.collection.JavaConverters._
-import scala.collection.{JavaConverters, mutable}
 
 class FunctionalIndexSupport(spark: SparkSession,
                              metadataConfig: HoodieMetadataConfig,
-                             metaClient: HoodieTableMetaClient) {
-
-  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
-  @transient private lazy val metadataTable: HoodieTableMetadata =
-    HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
+                             metaClient: HoodieTableMetaClient)
+  extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
 
   // NOTE: Since [[metadataConfig]] is transient this has to be eagerly 
persisted, before this will be passed on to the executor
   private val inMemoryProjectionThreshold = 
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
 
-  /**
-   * Determines whether it would be more optimal to read Column Stats Index a) 
in-memory of the invoking process,
-   * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
-   */
-  def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: 
Seq[String]): Boolean = {
-    Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
-      case Some(mode) =>
-        mode == 
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
-      case None =>
-        fileIndex.getFileSlicesCount * queryReferencedColumns.length < 
inMemoryProjectionThreshold
+  override def getIndexName: String = FunctionalIndexSupport.INDEX_NAME
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    lazy val functionalIndexPartitionOpt = 
getFunctionalIndexPartition(queryFilters)
+    if (isIndexAvailable && queryFilters.nonEmpty && 
functionalIndexPartitionOpt.nonEmpty) {
+      val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
+      val indexDf = 
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, readInMemory)
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
+      Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+    } else {
+      Option.empty
     }
   }
 
+  override def invalidateCaches(): Unit = {
+    // no caches for this index type, do nothing
+  }
+
   /**
    * Return true if metadata table is enabled and functional index metadata 
partition is available.
    */
@@ -78,42 +79,6 @@ class FunctionalIndexSupport(spark: SparkSession,
     metadataConfig.enabled && metaClient.getFunctionalIndexMetadata.isPresent 
&& !metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.isEmpty
   }
 
-  def getPrunedCandidateFileNames(indexPartition: String,
-                                  shouldReadInMemory: Boolean,
-                                  queryFilters: Seq[Expression]): Set[String] 
= {
-    val indexDf = loadFunctionalIndexDataFrame(indexPartition, 
shouldReadInMemory)
-    val indexSchema = indexDf.schema
-    val indexFilter =
-      queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
-        .reduce(And)
-
-    val prunedCandidateFileNames =
-      indexDf.where(new Column(indexFilter))
-        .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-        .collect()
-        .map(_.getString(0))
-        .toSet
-
-    prunedCandidateFileNames
-  }
-
-  def load(indexPartition: String,
-           targetColumns: Seq[String],
-           shouldReadInMemory: Boolean): DataFrame = {
-    val metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
-    // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
-    val colStatsDF = spark.read.format("org.apache.hudi")
-      .options(metadataConfig.getProps.asScala)
-      .load(s"$metadataTablePath/$indexPartition")
-
-    val requiredIndexColumns =
-      targetColumnStatsIndexColumns.map(colName =>
-        
col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
-
-    
colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
-      .select(requiredIndexColumns: _*)
-  }
-
   /**
    * Searches for an index partition based on the specified index function and 
target column name.
    *
@@ -127,7 +92,7 @@ class FunctionalIndexSupport(spark: SparkSession,
    * @return An `Option` containing the index partition identifier if a 
matching index definition is found.
    *         Returns `None` if no matching index definition is found.
    */
-  def getFunctionalIndexPartition(queryFilters: Seq[Expression]): 
Option[String] = {
+  private def getFunctionalIndexPartition(queryFilters: Seq[Expression]): 
Option[String] = {
     val functionToColumnNames = extractSparkFunctionNames(queryFilters)
     if (functionToColumnNames.nonEmpty) {
       // Currently, only one functional index in the query is supported. 
HUDI-7620 for supporting multiple functions.
@@ -171,8 +136,8 @@ class FunctionalIndexSupport(spark: SparkSession,
     }.toMap
   }
 
-  def loadFunctionalIndexDataFrame(indexPartition: String,
-                                   shouldReadInMemory: Boolean): DataFrame = {
+  private def loadFunctionalIndexDataFrame(indexPartition: String,
+                                           shouldReadInMemory: Boolean): 
DataFrame = {
     val colStatsDF = {
       val indexDefinition = 
metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.get(indexPartition)
       val indexType = indexDefinition.getIndexType
@@ -224,36 +189,10 @@ class FunctionalIndexSupport(spark: SparkSession,
 
     columnStatsRecords
   }
-
-  /**
-   * Returns the list of candidate files which store the provided record keys 
based on Metadata Table Record Index.
-   *
-   * @param allFiles   - List of all files which needs to be considered for 
the query
-   * @param recordKeys - List of record keys.
-   * @return Sequence of file names which need to be queried
-   */
-  def getCandidateFiles(allFiles: Seq[FileStatus], recordKeys: List[String]): 
Set[String] = {
-    val recordKeyLocationsMap = 
metadataTable.readRecordIndex(seqAsJavaListConverter(recordKeys).asJava)
-    val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
-    val candidateFiles: mutable.Set[String] = mutable.Set.empty
-    for (locations <- 
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
 {
-      for (location <- 
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
-        fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
-      }
-    }
-    for (file <- allFiles) {
-      val fileId = FSUtils.getFileIdFromFilePath(file.getPath)
-      val partitionOpt = fileIdToPartitionMap.get(fileId)
-      if (partitionOpt.isDefined) {
-        candidateFiles += file.getPath.getName
-      }
-    }
-    candidateFiles.toSet
-  }
 }
 
 object FunctionalIndexSupport {
-
+  val INDEX_NAME = "FUNCTIONAL"
   /**
    * Target Column Stats Index columns which internally are mapped onto fields 
of the corresponding
    * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted 
w/in Metadata Table
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 922137ff01c..faabbecc5c1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -26,25 +26,23 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, 
TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.HoodieMetadataPayload
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.hudi.util.JFunction
-
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
-import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.unsafe.types.UTF8String
-
 import java.text.SimpleDateFormat
 import java.util.stream.Collectors
+
 import javax.annotation.concurrent.NotThreadSafe
+
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
@@ -96,22 +94,16 @@ case class HoodieFileIndex(spark: SparkSession,
   @transient protected var hasPushedDownPartitionPredicates: Boolean = false
 
   /**
-   * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only 
relevant while logical plan
+   * NOTE: [[indicesSupport]] is a transient state, since it's only relevant 
while logical plan
    *       is handled by the Spark's driver
+   *       The order of elements is important as in this order indices will be 
applied
+   *       during `lookupCandidateFilesInMetadataTable`
    */
-  @transient private lazy val columnStatsIndex = new 
ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
-
-  /**
-   * NOTE: [[RecordLevelIndexSupport]] is a transient state, since it's only 
relevant while logical plan
-   * is handled by the Spark's driver
-   */
-  @transient private lazy val recordLevelIndex = new 
RecordLevelIndexSupport(spark, metadataConfig, metaClient)
-
-  /**
-   * NOTE: [[FunctionalIndexSupport]] is a transient state, since it's only 
relevant while logical plan
-   * is handled by the Spark's driver
-   */
-  @transient private lazy val functionalIndex = new 
FunctionalIndexSupport(spark, metadataConfig, metaClient)
+  @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = 
List(
+    new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
+    new FunctionalIndexSupport(spark, metadataConfig, metaClient),
+    new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
+  )
 
   private val enableHoodieExtension = 
spark.sessionState.conf.getConfString("spark.sql.extensions", "")
     .split(",")
@@ -335,6 +327,7 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from 
querying engine
    * @return list of pruned (data-skipped) candidate base-files and log files' 
names
    */
+  // scalastyle:off return
   private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression],
                                                   
prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
                                                   shouldPushDownFilesFilter: 
Boolean): Try[Option[Set[String]]] = Try {
@@ -349,85 +342,24 @@ case class HoodieFileIndex(spark: SparkSession,
     //       and candidate files are obtained from these file slices.
 
     lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
-    lazy val (_, recordKeys) = 
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
-    lazy val functionalIndexPartitionOpt = 
functionalIndex.getFunctionalIndexPartition(queryFilters)
-    if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
-      validateConfig()
-      Option.empty
-    } else if (recordKeys.nonEmpty) {
-      Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
-    } else if (functionalIndex.isIndexAvailable && queryFilters.nonEmpty && 
functionalIndexPartitionOpt.nonEmpty) {
-      val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, 
queryReferencedColumns)
-      val indexDf = 
functionalIndex.loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, 
shouldReadInMemory)
-      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
-      Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
-    } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || 
queryReferencedColumns.isEmpty) {
-      validateConfig()
-      Option.empty
-    } else {
-      // NOTE: Since executing on-cluster via Spark API has its own 
non-trivial amount of overhead,
-      //       it's most often preferential to fetch Column Stats Index w/in 
the same process (usually driver),
-      //       w/o resorting to on-cluster execution.
-      //       For that we use a simple-heuristic to determine whether we 
should read and process CSI in-memory or
-      //       on-cluster: total number of rows of the expected projected 
portion of the index has to be below the
-      //       threshold (of 100k records)
-      val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, 
queryReferencedColumns)
-      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
-      // NOTE: If partition pruning doesn't prune any files, then there's no 
need to apply file filters
-      //       when loading the Column Statistics Index
-      val prunedFileNamesOpt = if (shouldPushDownFilesFilter) 
Some(prunedFileNames) else None
-
-      columnStatsIndex.loadTransposed(queryReferencedColumns, 
shouldReadInMemory, prunedFileNamesOpt) { transposedColStatsDF =>
-        Some(getCandidateFiles(transposedColStatsDF, queryFilters, 
prunedFileNames))
+    if (isMetadataTableEnabled && isDataSkippingEnabled) {
+      for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
+        if (indexSupport.isIndexAvailable) {
+          val prunedFileNames = indexSupport.computeCandidateFileNames(this, 
queryFilters, queryReferencedColumns,
+            prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
+          if (prunedFileNames.nonEmpty) {
+            return Try(prunedFileNames)
+          }
+        }
       }
     }
-  }
-
-  private def getPrunedFileNames(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): 
Set[String] = {
-      prunedPartitionsAndFileSlices
-        .flatMap {
-          case (_, fileSlices) => fileSlices
-        }
-        .flatMap { fileSlice =>
-          val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
-          val logFiles = if (includeLogFiles) {
-            fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
-          } else Nil
-          baseFileOption.map(_.getFileName).toList ++ logFiles
-        }
-        .toSet
-  }
-
-  private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
-    val indexSchema = indexDf.schema
-    val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
-    val prunedCandidateFileNames =
-      indexDf.where(new Column(indexFilter))
-        .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-        .collect()
-        .map(_.getString(0))
-        .toSet
-
-    // NOTE: Col-Stats Index isn't guaranteed to have complete set of 
statistics for every
-    //       base-file or log file: since it's bound to clustering, which 
could occur asynchronously
-    //       at arbitrary point in time, and is not likely to be touching all 
of the base files.
-    //
-    //       To close that gap, we manually compute the difference b/w all 
indexed (by col-stats-index)
-    //       files and all outstanding base-files or log files, and make sure 
that all base files and
-    //       log file not represented w/in the index are included in the 
output of this method
-    val allIndexedFileNames =
-    indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
-      .collect()
-      .map(_.getString(0))
-      .toSet
-    val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
-
-    prunedCandidateFileNames ++ notIndexedFileNames
+    validateConfig()
+    Option.empty
   }
 
   override def refresh(): Unit = {
     super.refresh()
-    columnStatsIndex.invalidateCaches()
+    indicesSupport.foreach(idx => idx.invalidateCaches())
     hasPushedDownPartitionPredicates = false
   }
 
@@ -460,17 +392,19 @@ case class HoodieFileIndex(spark: SparkSession,
 
   private def isColumnStatsIndexEnabled: Boolean = 
metadataConfig.isColumnStatsIndexEnabled
 
-  private def isRecordIndexEnabled: Boolean = recordLevelIndex.isIndexAvailable
+  private def isRecordIndexEnabled: Boolean = indicesSupport.exists(idx =>
+    idx.getIndexName == RecordLevelIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
 
-  private def isFunctionalIndexEnabled: Boolean = 
functionalIndex.isIndexAvailable
+  private def isFunctionalIndexEnabled: Boolean = indicesSupport.exists(idx =>
+    idx.getIndexName == FunctionalIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
 
-  private def isIndexEnabled: Boolean = isColumnStatsIndexEnabled || 
isRecordIndexEnabled || isFunctionalIndexEnabled
+  private def isIndexEnabled: Boolean = indicesSupport.exists(idx => 
idx.isIndexAvailable)
 
   private def validateConfig(): Unit = {
     if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled)) 
{
       logWarning("Data skipping requires both Metadata Table and at least one 
of Column Stats Index, Record Level Index, or Functional Index" +
         " to be enabled as well! " + s"(isMetadataTableEnabled = 
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
-        + s", isRecordIndexApplicable = $isRecordIndexEnabled)")
+        + s", isRecordIndexApplicable = $isRecordIndexEnabled, 
isFunctionalIndexEnabled = $isFunctionalIndexEnabled)")
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 7d7d88cb2d4..119e3318902 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -17,28 +17,45 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.metadata.{HoodieTableMetadata, HoodieTableMetadataUtil}
-import org.apache.hudi.storage.StoragePathInfo
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.util.JFunction
-
-import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, In, Literal}
 
-import scala.collection.{mutable, JavaConverters}
+import scala.collection.{JavaConverters, mutable}
 
-class RecordLevelIndexSupport(spark: SparkSession,
+class RecordLevelIndexSupport (spark: SparkSession,
                               metadataConfig: HoodieMetadataConfig,
-                              metaClient: HoodieTableMetaClient) {
+                              metaClient: HoodieTableMetaClient)
+  extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
+
+
+  override def getIndexName: String = RecordLevelIndexSupport.INDEX_NAME
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
+    val allFiles = fileIndex.inputFiles.map(strPath => new 
StoragePath(strPath)).toSeq
+    if (recordKeys.nonEmpty) {
+      Option.apply(getCandidateFiles(allFiles, recordKeys))
+    } else {
+      Option.empty
+    }
+  }
 
-  @transient private lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
-  @transient private lazy val metadataTable: HoodieTableMetadata =
-    HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
+  override def invalidateCaches(): Unit = {
+    // no caches for this index type, do nothing
+  }
 
   /**
    * Returns the list of candidate files which store the provided record keys 
based on Metadata Table Record Index.
@@ -47,7 +64,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
    * @param recordKeys - List of record keys.
    * @return Sequence of file names which need to be queried
    */
-  def getCandidateFiles(allFiles: Seq[StoragePathInfo], recordKeys: 
List[String]): Set[String] = {
+  def getCandidateFiles(allFiles: Seq[StoragePath], recordKeys: List[String]): 
Set[String] = {
     val recordKeyLocationsMap = 
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
     val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
     val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -57,10 +74,10 @@ class RecordLevelIndexSupport(spark: SparkSession,
       }
     }
     for (file <- allFiles) {
-      val fileId = FSUtils.getFileIdFromFilePath(file.getPath)
+      val fileId = FSUtils.getFileIdFromFilePath(file)
       val partitionOpt = fileIdToPartitionMap.get(fileId)
       if (partitionOpt.isDefined) {
-        candidateFiles += file.getPath.getName
+        candidateFiles += file.getName
       }
     }
     candidateFiles.toSet
@@ -87,7 +104,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
    * @param queryFilters The queries that need to be filtered.
    * @return Tuple of List of filtered queries and list of record key literals 
that need to be matched
    */
-  def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
+  private def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
     if (!isIndexAvailable) {
       (List.empty, List.empty)
     } else {
@@ -115,6 +132,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
 }
 
 object RecordLevelIndexSupport {
+  val INDEX_NAME = "RECORD_LEVEL"
   /**
    * If the input query is an EqualTo or IN query on simple record key 
columns, the function returns a tuple of
    * list of the query and list of record key literals present in the query 
otherwise returns an empty option.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
new file mode 100644
index 00000000000..d82ed6ca6d5
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
+
+import scala.collection.JavaConverters._
+
+abstract class SparkBaseIndexSupport(spark: SparkSession,
+                                     metadataConfig: HoodieMetadataConfig,
+                                     metaClient: HoodieTableMetaClient) {
+  @transient protected lazy val engineCtx = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
+  @transient protected lazy val metadataTable: HoodieTableMetadata =
+    HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
+
+  def getIndexName: String
+
+  def isIndexAvailable: Boolean
+
+  def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                queryFilters: Seq[Expression],
+                                queryReferencedColumns: Seq[String],
+                                prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                shouldPushDownFilesFilter: Boolean): 
Option[Set[String]]
+
+  def invalidateCaches(): Unit
+
+  protected def getPrunedFileNames(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                   includeLogFiles: Boolean = false): 
Set[String] = {
+    prunedPartitionsAndFileSlices
+      .flatMap {
+        case (_, fileSlices) => fileSlices
+      }
+      .flatMap { fileSlice =>
+        val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+        val logFiles = if (includeLogFiles) {
+          fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
+        } else Nil
+        baseFileOption.map(_.getFileName).toList ++ logFiles
+      }
+      .toSet
+  }
+
+  protected def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
+    val indexSchema = indexDf.schema
+    val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
+    val prunedCandidateFileNames =
+      indexDf.where(new Column(indexFilter))
+        .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+        .collect()
+        .map(_.getString(0))
+        .toSet
+
+    // NOTE: Col-Stats Index isn't guaranteed to have complete set of 
statistics for every
+    //       base-file or log file: since it's bound to clustering, which 
could occur asynchronously
+    //       at arbitrary point in time, and is not likely to be touching all 
of the base files.
+    //
+    //       To close that gap, we manually compute the difference b/w all 
indexed (by col-stats-index)
+    //       files and all outstanding base-files or log files, and make sure 
that all base files and
+    //       log file not represented w/in the index are included in the 
output of this method
+    val allIndexedFileNames =
+    indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+      .collect()
+      .map(_.getString(0))
+      .toSet
+    val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
+
+    prunedCandidateFileNames ++ notIndexedFileNames
+  }
+
+  /**
+   * Determines whether it would be more optimal to read Column Stats Index a) 
in-memory of the invoking process,
+   * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
+   */
+  protected def shouldReadInMemory(fileIndex: HoodieFileIndex, 
queryReferencedColumns: Seq[String], inMemoryProjectionThreshold: Integer): 
Boolean = {
+    Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
+      case Some(mode) =>
+        mode == 
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
+      case None =>
+        fileIndex.getFileSlicesCount * queryReferencedColumns.length < 
inMemoryProjectionThreshold
+    }
+  }
+
+}


Reply via email to