This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf0966825f7 [HUDI-6651] Support IN SQL query with Record Index (#9373)
bf0966825f7 is described below

commit bf0966825f714dd6512b3bf043054ab820b0158d
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Aug 7 06:43:35 2023 +0530

    [HUDI-6651] Support IN SQL query with Record Index (#9373)
---
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  47 ++++++---
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |  78 ++++++++++-----
 .../functional/TestRecordLevelIndexWithSQL.scala   | 110 ++++++++++++++++++++-
 3 files changed, 191 insertions(+), 44 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 964ef970c91..0f33f2ec1fd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -96,6 +96,10 @@ case class HoodieFileIndex(spark: SparkSession,
    */
   @transient private lazy val columnStatsIndex = new 
ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
 
+  /**
+   * NOTE: [[RecordLevelIndexSupport]] is a transient state, since it's only 
relevant while logical plan
+   * is handled by the Spark's driver
+   */
   @transient private lazy val recordLevelIndex = new 
RecordLevelIndexSupport(spark, metadataConfig, metaClient)
 
   override def rootPaths: Seq[Path] = getQueryPaths.asScala
@@ -209,9 +213,10 @@ case class HoodieFileIndex(spark: SparkSession,
     if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty) {
       prunedPartitionsAndFileSlices
     } else {
-      // Look up candidate files names in the col-stats index, if all of the 
following conditions are true
+      // Look up candidate files names in the col-stats or record level index, 
if all of the following conditions are true
       //    - Data-skipping is enabled
       //    - Col-Stats Index is present
+      //    - Record-level Index is present
       //    - List of predicates (filters) is present
       val candidateFilesNamesOpt: Option[Set[String]] =
       lookupCandidateFilesInMetadataTable(dataFilters) match {
@@ -225,14 +230,14 @@ case class HoodieFileIndex(spark: SparkSession,
           }
       }
 
-      logDebug(s"Overlapping candidate files from Column Stats Index: 
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
+      logDebug(s"Overlapping candidate files from Column Stats or Record Level 
Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
 
       var totalFileSliceSize = 0
       var candidateFileSliceSize = 0
 
       val prunedPartitionsAndFilteredFileSlices = 
prunedPartitionsAndFileSlices.map {
         case (partitionOpt, fileSlices) =>
-          // Filter in candidate files based on the col-stats index lookup
+          // Filter in candidate files based on the col-stats or record level 
index lookup
           val candidateFileSlices: Seq[FileSlice] = {
             fileSlices.filter(fs => {
               val fileSliceFiles = 
fs.getLogFiles.map[String](JFunction.toJavaFunction[HoodieLogFile, String](lf 
=> lf.getPath.getName))
@@ -303,30 +308,35 @@ case class HoodieFileIndex(spark: SparkSession,
 
   /**
    * Computes pruned list of candidate base-files' names based on provided 
list of {@link dataFilters}
-   * conditions, by leveraging Metadata Table's Column Statistics index 
(hereon referred as ColStats for brevity)
-   * bearing "min", "max", "num_nulls" statistics for all columns.
+   * conditions, by leveraging Metadata Table's Record Level Index and Column 
Statistics index (hereon referred as
+   * ColStats for brevity) bearing "min", "max", "num_nulls" statistics for 
all columns.
    *
    * NOTE: This method has to return complete set of candidate files, since 
only provided candidates will
    * ultimately be scanned as part of query execution. Hence, this method has 
to maintain the
-   * invariant of conservatively including every base-file's name, that is NOT 
referenced in its index.
+   * invariant of conservatively including every base-file and log file's 
name, that is NOT referenced in its index.
    *
    * @param queryFilters list of original data filters passed down from 
querying engine
-   * @return list of pruned (data-skipped) candidate base-files' names
+   * @return list of pruned (data-skipped) candidate base-files and log files' 
names
    */
   private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
-    // NOTE: Data Skipping is only effective when it references columns that 
are indexed w/in
+    // NOTE: For column stats, Data Skipping is only effective when it 
references columns that are indexed w/in
     //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
     //          - Expressions on top-level column's fields (ie, for ex filters 
like "struct.field > 0", since
     //          CSI only contains stats for top-level columns, in this case 
for "struct")
     //          - Any expression not directly referencing top-level column 
(for ex, sub-queries, since there's
     //          nothing CSI in particular could be applied for)
+    //       For record index, Data Skipping is only effective when one of the 
query filter is of type EqualTo
+    //       or IN query on simple record keys. In such a case the record 
index is used to filter the file slices
+    //       and candidate files are obtained from these file slices.
+
     lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
 
+    lazy val (_, recordKeys) = 
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
     if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
       validateConfig()
       Option.empty
-    } else if (recordLevelIndex.isIndexApplicable(queryFilters)) {
-      Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
queryFilters))
+    } else if (recordKeys.nonEmpty) {
+      Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
     } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || 
queryReferencedColumns.isEmpty) {
       validateConfig()
       Option.empty
@@ -359,12 +369,12 @@ case class HoodieFileIndex(spark: SparkSession,
             .toSet
 
         // NOTE: Col-Stats Index isn't guaranteed to have complete set of 
statistics for every
-        //       base-file: since it's bound to clustering, which could occur 
asynchronously
+        //       base-file or log file: since it's bound to clustering, which 
could occur asynchronously
         //       at arbitrary point in time, and is not likely to be touching 
all of the base files.
         //
         //       To close that gap, we manually compute the difference b/w all 
indexed (by col-stats-index)
-        //       files and all outstanding base-files, and make sure that all 
base files not
-        //       represented w/in the index are included in the output of this 
method
+        //       files and all outstanding base-files or log files, and make 
sure that all base files and
+        //       log file not represented w/in the index are included in the 
output of this method
         val notIndexedFileNames = 
lookupFileNamesMissingFromIndex(allIndexedFileNames)
 
         Some(prunedCandidateFileNames ++ notIndexedFileNames)
@@ -397,10 +407,15 @@ case class HoodieFileIndex(spark: SparkSession,
 
   private def isColumnStatsIndexEnabled: Boolean = 
metadataConfig.isColumnStatsIndexEnabled
 
+  private def isRecordIndexEnabled: Boolean = recordLevelIndex.isIndexAvailable
+
+  private def isIndexEnabled: Boolean = isColumnStatsIndexEnabled || 
isRecordIndexEnabled
+
   private def validateConfig(): Unit = {
-    if (isDataSkippingEnabled && (!isMetadataTableEnabled || 
!isColumnStatsIndexEnabled)) {
-      logWarning("Data skipping requires both Metadata Table and Column Stats 
Index to be enabled as well! " +
-        s"(isMetadataTableEnabled = $isMetadataTableEnabled, 
isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled")
+    if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled)) 
{
+      logWarning("Data skipping requires both Metadata Table and at least one 
of Column Stats Index or Record Level Index" +
+        " to be enabled as well! " + s"(isMetadataTableEnabled = 
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
+        + s", isRecordIndexApplicable = $isRecordIndexEnabled)")
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 48e6ef837f8..743ce0cc6c1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -27,7 +27,7 @@ import org.apache.hudi.metadata.{HoodieTableMetadata, 
HoodieTableMetadataUtil}
 import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, In, Literal}
 
 import scala.collection.{JavaConverters, mutable}
 
@@ -40,13 +40,12 @@ class RecordLevelIndexSupport(spark: SparkSession,
     HoodieTableMetadata.create(engineCtx, metadataConfig, 
metaClient.getBasePathV2.toString)
 
   /**
-   * Returns the list of candidate files which should be queried after pruning 
based on query filters.
+   * Returns the list of candidate files which store the provided record keys 
based on Metadata Table Record Index.
    * @param allFiles - List of all files which needs to be considered for the 
query
-   * @param queryFilters - Input query filters. List of candidate files are 
pruned based on these query filters.
+   * @param recordKeys - List of record keys.
    * @return Sequence of file names which need to be queried
    */
-  def getCandidateFiles(allFiles: Seq[FileStatus], queryFilters: 
Seq[Expression]): Set[String] = {
-    val (_, recordKeys) = filterQueryFiltersWithRecordKey(queryFilters)
+  def getCandidateFiles(allFiles: Seq[FileStatus], recordKeys: List[String]): 
Set[String] = {
     val recordKeyLocationsMap = 
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
     val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
     val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -118,39 +117,70 @@ class RecordLevelIndexSupport(spark: SparkSession,
   }
 
   /**
-   * Given query filters, it filters the EqualTo queries on simple record key 
columns and returns a tuple of list of such
-   * queries and list of record key literals present in the query.
+   * Given query filters, it filters the EqualTo and IN queries on simple 
record key columns and returns a tuple of
+   * list of such queries and list of record key literals present in the query.
+   * If record index is not available, it returns empty list for record 
filters and record keys
    * @param queryFilters The queries that need to be filtered.
    * @return Tuple of List of filtered queries and list of record key literals 
that need to be matched
    */
-  private def filterQueryFiltersWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
-    var recordKeyQueries: List[Expression] = List.empty
-    var recordKeys: List[String] = List.empty
-    for (query <- queryFilters) {
-      query match {
-        case equalToQuery: EqualTo =>
-          val (attribute, literal) = 
getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
-          if (attribute != null && attribute.name != null && 
attributeMatchesRecordKey(attribute.name)) {
-            recordKeys = recordKeys :+ literal.value.toString
-            recordKeyQueries = recordKeyQueries :+ equalToQuery
-          }
-        case _ =>
+  def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
+    if (!isIndexAvailable) {
+      (List.empty, List.empty)
+    } else {
+      var recordKeyQueries: List[Expression] = List.empty
+      var recordKeys: List[String] = List.empty
+      for (query <- queryFilters) {
+        filterQueryWithRecordKey(query).foreach({
+          case (exp: Expression, recKeys: List[String]) =>
+            recordKeys = recordKeys ++ recKeys
+            recordKeyQueries = recordKeyQueries :+ exp
+        })
       }
+
+      Tuple2.apply(recordKeyQueries, recordKeys)
     }
-    Tuple2.apply(recordKeyQueries, recordKeys)
   }
 
   /**
-   * Returns true in cases when metadata table is enabled and Record Level 
Index is built.
+   * If the input query is an EqualTo or IN query on simple record key 
columns, the function returns a tuple of
+   * list of the query and list of record key literals present in the query 
otherwise returns an empty option.
+   *
+   * @param queryFilter The query that need to be filtered.
+   * @return Tuple of filtered query and list of record key literals that need 
to be matched
    */
-  def isIndexApplicable(queryFilters: Seq[Expression]): Boolean = {
-    isIndexAvailable && 
filterQueryFiltersWithRecordKey(queryFilters)._1.nonEmpty
+  private def filterQueryWithRecordKey(queryFilter: Expression): 
Option[(Expression, List[String])] = {
+    queryFilter match {
+      case equalToQuery: EqualTo =>
+        val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left, 
equalToQuery.right).orNull
+        if (attribute != null && attribute.name != null && 
attributeMatchesRecordKey(attribute.name)) {
+          Option.apply(equalToQuery, List.apply(literal.value.toString))
+        } else {
+          Option.empty
+        }
+      case inQuery: In =>
+        var validINQuery = true
+        inQuery.value match {
+          case _: AttributeReference =>
+          case _ => validINQuery = false
+        }
+        var literals: List[String] = List.empty
+        inQuery.list.foreach {
+          case literal: Literal => literals = literals :+ 
literal.value.toString
+          case _ => validINQuery = false
+        }
+        if (validINQuery) {
+          Option.apply(inQuery, literals)
+        } else {
+          Option.empty
+        }
+      case _ => Option.empty
+    }
   }
 
   /**
    * Return true if metadata table is enabled and record index metadata 
partition is available.
    */
-  private def isIndexAvailable: Boolean = {
+  def isIndexAvailable: Boolean = {
     metadataConfig.enabled && 
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
index e3048a0d914..8e235960fba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
@@ -17,8 +17,15 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
 import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, GreaterThan, GreaterThanOrEqual, In, Literal, Or}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -28,7 +35,7 @@ class TestRecordLevelIndexWithSQL extends 
RecordLevelIndexTestBase {
   val sqlTempTable = "tbl"
 
   @ParameterizedTest
-  @ValueSource(strings = Array("COPY_ON_WRITE"))
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
   def testRLIWithSQL(tableType: String): Unit = {
     var hudiOpts = commonOpts
     hudiOpts = hudiOpts + (
@@ -45,8 +52,103 @@ class TestRecordLevelIndexWithSQL extends 
RecordLevelIndexTestBase {
       validate = false)
 
     createTempTable(hudiOpts)
-    val reckey = 
mergedDfList.last.limit(1).collect()(0).getAs("_row_key").toString
-    spark.sql("select * from " + sqlTempTable + " where '" + reckey + "' = 
_row_key").show(false)
+    verifyInQuery(hudiOpts)
+    verifyEqualToQuery(hudiOpts)
+    verifyNegativeTestCases(hudiOpts)
+  }
+
+  private def verifyNegativeTestCases(hudiOpts: Map[String, String]): Unit = {
+    val commonOpts = hudiOpts + ("path" -> basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+
+    // when no data filter is applied
+    assertEquals(getLatestDataFilesCount(commonOpts), 
fileIndex.listFiles(Seq.empty, Seq.empty).flatMap(s => s.files).size)
+    assertEquals(6, spark.sql("select * from " + sqlTempTable).count())
+
+    // non existing entries in EqualTo query
+    var dataFilter: Expression = EqualTo(attribute("_row_key"), Literal("xyz"))
+    assertEquals(0, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    assertEquals(0, fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s 
=> s.files).size)
+
+    // non existing entries in IN query
+    dataFilter = In(attribute("_row_key"), List.apply(Literal("xyz"), 
Literal("abc")))
+    assertEquals(0, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    assertEquals(0, fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s 
=> s.files).size)
+
+    // not supported GreaterThan query
+    val reckey = mergedDfList.last.limit(2).collect().map(row => 
row.getAs("_row_key").toString)
+    dataFilter = GreaterThan(attribute("_row_key"), Literal(reckey(0)))
+    assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s => 
s.files).size >= 3)
+
+    // not supported OR query
+    dataFilter = Or(EqualTo(attribute("_row_key"), Literal(reckey(0))), 
GreaterThanOrEqual(attribute("timestamp"), Literal(0)))
+    assertEquals(6, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s => 
s.files).size >= 3)
+  }
+
+  def verifyEqualToQuery(hudiOpts: Map[String, String]): Unit = {
+    val reckey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs("_row_key").toString)
+    val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
+    assertEquals(1, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    verifyPruningFileCount(hudiOpts, dataFilter, 1)
+  }
+
+  def verifyInQuery(hudiOpts: Map[String, String]): Unit = {
+    var reckey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs("_row_key").toString)
+    var dataFilter = In(attribute("_row_key"), reckey.map(l => 
literal(l)).toList)
+    assertEquals(1, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    var numFiles = if (isTableMOR()) 2 else 1
+    verifyPruningFileCount(hudiOpts, dataFilter, numFiles)
+
+    reckey = mergedDfList.last.limit(2).collect().map(row => 
row.getAs("_row_key").toString)
+    dataFilter = In(attribute("_row_key"), reckey.map(l => literal(l)).toList)
+    assertEquals(2, spark.sql("select * from " + sqlTempTable + " where " + 
dataFilter.sql).count())
+    numFiles = if (isTableMOR()) 2 else 2
+    verifyPruningFileCount(hudiOpts, dataFilter, numFiles)
+  }
+
+  private def attribute(partition: String): AttributeReference = {
+    AttributeReference(partition, StringType, true)()
+  }
+
+  private def literal(value: String): Literal = {
+    Literal.create(value)
+  }
+
+  private def verifyPruningFileCount(opts: Map[String, String], dataFilter: 
Expression, numFiles: Int): Unit = {
+    // with data skipping
+    val commonOpts = opts + ("path" -> basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+    val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+    val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
+    assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+    assertEquals(filteredFilesCount, numFiles)
+
+    // with no data skipping
+    fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
+    val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+    assertTrue(filteredFilesCount < filesCountWithNoSkipping)
+  }
+
+  private def isTableMOR(): Boolean = {
+    metaClient.getTableType == HoodieTableType.MERGE_ON_READ
+  }
+
+  private def getLatestDataFilesCount(opts: Map[String, String], 
includeLogFiles: Boolean = true) = {
+    var totalLatestDataFiles = 0L
+    
getTableFileSystenView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+      .values()
+      .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+        (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+          slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+            + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    totalLatestDataFiles
+  }
+
+  private def getTableFileSystenView(opts: Map[String, String]): 
HoodieMetadataFileSystemView = {
+    new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, 
metadataWriter(getWriteConfig(opts)).getTableMetadata)
   }
 
   private def createTempTable(hudiOpts: Map[String, String]): Unit = {

Reply via email to