This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf0966825f7 [HUDI-6651] Support IN SQL query with Record Index (#9373)
bf0966825f7 is described below
commit bf0966825f714dd6512b3bf043054ab820b0158d
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Aug 7 06:43:35 2023 +0530
[HUDI-6651] Support IN SQL query with Record Index (#9373)
---
.../scala/org/apache/hudi/HoodieFileIndex.scala | 47 ++++++---
.../org/apache/hudi/RecordLevelIndexSupport.scala | 78 ++++++++++-----
.../functional/TestRecordLevelIndexWithSQL.scala | 110 ++++++++++++++++++++-
3 files changed, 191 insertions(+), 44 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 964ef970c91..0f33f2ec1fd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -96,6 +96,10 @@ case class HoodieFileIndex(spark: SparkSession,
*/
@transient private lazy val columnStatsIndex = new
ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
+ /**
+ * NOTE: [[RecordLevelIndexSupport]] is a transient state, since it's only
relevant while logical plan
+ * is handled by the Spark's driver
+ */
@transient private lazy val recordLevelIndex = new
RecordLevelIndexSupport(spark, metadataConfig, metaClient)
override def rootPaths: Seq[Path] = getQueryPaths.asScala
@@ -209,9 +213,10 @@ case class HoodieFileIndex(spark: SparkSession,
if (prunedPartitionsAndFileSlices.isEmpty || dataFilters.isEmpty) {
prunedPartitionsAndFileSlices
} else {
- // Look up candidate files names in the col-stats index, if all of the
following conditions are true
+ // Look up candidate files names in the col-stats or record level index,
if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
+ // - Record-level Index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInMetadataTable(dataFilters) match {
@@ -225,14 +230,14 @@ case class HoodieFileIndex(spark: SparkSession,
}
}
- logDebug(s"Overlapping candidate files from Column Stats Index:
${candidateFilesNamesOpt.getOrElse(Set.empty)}")
+ logDebug(s"Overlapping candidate files from Column Stats or Record Level
Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
var totalFileSliceSize = 0
var candidateFileSliceSize = 0
val prunedPartitionsAndFilteredFileSlices =
prunedPartitionsAndFileSlices.map {
case (partitionOpt, fileSlices) =>
- // Filter in candidate files based on the col-stats index lookup
+ // Filter in candidate files based on the col-stats or record level
index lookup
val candidateFileSlices: Seq[FileSlice] = {
fileSlices.filter(fs => {
val fileSliceFiles =
fs.getLogFiles.map[String](JFunction.toJavaFunction[HoodieLogFile, String](lf
=> lf.getPath.getName))
@@ -303,30 +308,35 @@ case class HoodieFileIndex(spark: SparkSession,
/**
* Computes pruned list of candidate base-files' names based on provided
list of {@link dataFilters}
- * conditions, by leveraging Metadata Table's Column Statistics index
(hereon referred as ColStats for brevity)
- * bearing "min", "max", "num_nulls" statistics for all columns.
+ * conditions, by leveraging Metadata Table's Record Level Index and Column
Statistics index (hereon referred as
+ * ColStats for brevity) bearing "min", "max", "num_nulls" statistics for
all columns.
*
* NOTE: This method has to return complete set of candidate files, since
only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has
to maintain the
- * invariant of conservatively including every base-file's name, that is NOT
referenced in its index.
+ * invariant of conservatively including every base-file and log file's
name, that is NOT referenced in its index.
*
* @param queryFilters list of original data filters passed down from
querying engine
- * @return list of pruned (data-skipped) candidate base-files' names
+ * @return list of pruned (data-skipped) candidate base-files and log files'
names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
- // NOTE: Data Skipping is only effective when it references columns that
are indexed w/in
+ // NOTE: For column stats, Data Skipping is only effective when it
references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters
like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case
for "struct")
// - Any expression not directly referencing top-level column
(for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
+ // For record index, Data Skipping is only effective when one of the
query filter is of type EqualTo
+ // or IN query on simple record keys. In such a case the record
index is used to filter the file slices
+ // and candidate files are obtained from these file slices.
+
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
+ lazy val (_, recordKeys) =
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
validateConfig()
Option.empty
- } else if (recordLevelIndex.isIndexApplicable(queryFilters)) {
- Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(),
queryFilters))
+ } else if (recordKeys.nonEmpty) {
+ Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(),
recordKeys))
} else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty ||
queryReferencedColumns.isEmpty) {
validateConfig()
Option.empty
@@ -359,12 +369,12 @@ case class HoodieFileIndex(spark: SparkSession,
.toSet
// NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
- // base-file: since it's bound to clustering, which could occur
asynchronously
+ // base-file or log file: since it's bound to clustering, which
could occur asynchronously
// at arbitrary point in time, and is not likely to be touching
all of the base files.
//
// To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
- // files and all outstanding base-files, and make sure that all
base files not
- // represented w/in the index are included in the output of this
method
+ // files and all outstanding base-files or log files, and make
sure that all base files and
+ // log file not represented w/in the index are included in the
output of this method
val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)
Some(prunedCandidateFileNames ++ notIndexedFileNames)
@@ -397,10 +407,15 @@ case class HoodieFileIndex(spark: SparkSession,
private def isColumnStatsIndexEnabled: Boolean =
metadataConfig.isColumnStatsIndexEnabled
+ private def isRecordIndexEnabled: Boolean = recordLevelIndex.isIndexAvailable
+
+ private def isIndexEnabled: Boolean = isColumnStatsIndexEnabled ||
isRecordIndexEnabled
+
private def validateConfig(): Unit = {
- if (isDataSkippingEnabled && (!isMetadataTableEnabled ||
!isColumnStatsIndexEnabled)) {
- logWarning("Data skipping requires both Metadata Table and Column Stats
Index to be enabled as well! " +
- s"(isMetadataTableEnabled = $isMetadataTableEnabled,
isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled")
+ if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isIndexEnabled))
{
+ logWarning("Data skipping requires both Metadata Table and at least one
of Column Stats Index or Record Level Index" +
+ " to be enabled as well! " + s"(isMetadataTableEnabled =
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
+ + s", isRecordIndexApplicable = $isRecordIndexEnabled)")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 48e6ef837f8..743ce0cc6c1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -27,7 +27,7 @@ import org.apache.hudi.metadata.{HoodieTableMetadata,
HoodieTableMetadataUtil}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, In, Literal}
import scala.collection.{JavaConverters, mutable}
@@ -40,13 +40,12 @@ class RecordLevelIndexSupport(spark: SparkSession,
HoodieTableMetadata.create(engineCtx, metadataConfig,
metaClient.getBasePathV2.toString)
/**
- * Returns the list of candidate files which should be queried after pruning
based on query filters.
+ * Returns the list of candidate files which store the provided record keys
based on Metadata Table Record Index.
* @param allFiles - List of all files which needs to be considered for the
query
- * @param queryFilters - Input query filters. List of candidate files are
pruned based on these query filters.
+ * @param recordKeys - List of record keys.
* @return Sequence of file names which need to be queried
*/
- def getCandidateFiles(allFiles: Seq[FileStatus], queryFilters:
Seq[Expression]): Set[String] = {
- val (_, recordKeys) = filterQueryFiltersWithRecordKey(queryFilters)
+ def getCandidateFiles(allFiles: Seq[FileStatus], recordKeys: List[String]):
Set[String] = {
val recordKeyLocationsMap =
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -118,39 +117,70 @@ class RecordLevelIndexSupport(spark: SparkSession,
}
/**
- * Given query filters, it filters the EqualTo queries on simple record key
columns and returns a tuple of list of such
- * queries and list of record key literals present in the query.
+ * Given query filters, it filters the EqualTo and IN queries on simple
record key columns and returns a tuple of
+ * list of such queries and list of record key literals present in the query.
+ * If record index is not available, it returns empty list for record
filters and record keys
* @param queryFilters The queries that need to be filtered.
* @return Tuple of List of filtered queries and list of record key literals
that need to be matched
*/
- private def filterQueryFiltersWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
- var recordKeyQueries: List[Expression] = List.empty
- var recordKeys: List[String] = List.empty
- for (query <- queryFilters) {
- query match {
- case equalToQuery: EqualTo =>
- val (attribute, literal) =
getAttributeLiteralTuple(equalToQuery.left, equalToQuery.right).orNull
- if (attribute != null && attribute.name != null &&
attributeMatchesRecordKey(attribute.name)) {
- recordKeys = recordKeys :+ literal.value.toString
- recordKeyQueries = recordKeyQueries :+ equalToQuery
- }
- case _ =>
+ def filterQueriesWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
+ if (!isIndexAvailable) {
+ (List.empty, List.empty)
+ } else {
+ var recordKeyQueries: List[Expression] = List.empty
+ var recordKeys: List[String] = List.empty
+ for (query <- queryFilters) {
+ filterQueryWithRecordKey(query).foreach({
+ case (exp: Expression, recKeys: List[String]) =>
+ recordKeys = recordKeys ++ recKeys
+ recordKeyQueries = recordKeyQueries :+ exp
+ })
}
+
+ Tuple2.apply(recordKeyQueries, recordKeys)
}
- Tuple2.apply(recordKeyQueries, recordKeys)
}
/**
- * Returns true in cases when metadata table is enabled and Record Level
Index is built.
+ * If the input query is an EqualTo or IN query on simple record key
columns, the function returns a tuple of
+ * list of the query and list of record key literals present in the query
otherwise returns an empty option.
+ *
+ * @param queryFilter The query that need to be filtered.
+ * @return Tuple of filtered query and list of record key literals that need
to be matched
*/
- def isIndexApplicable(queryFilters: Seq[Expression]): Boolean = {
- isIndexAvailable &&
filterQueryFiltersWithRecordKey(queryFilters)._1.nonEmpty
+ private def filterQueryWithRecordKey(queryFilter: Expression):
Option[(Expression, List[String])] = {
+ queryFilter match {
+ case equalToQuery: EqualTo =>
+ val (attribute, literal) = getAttributeLiteralTuple(equalToQuery.left,
equalToQuery.right).orNull
+ if (attribute != null && attribute.name != null &&
attributeMatchesRecordKey(attribute.name)) {
+ Option.apply(equalToQuery, List.apply(literal.value.toString))
+ } else {
+ Option.empty
+ }
+ case inQuery: In =>
+ var validINQuery = true
+ inQuery.value match {
+ case _: AttributeReference =>
+ case _ => validINQuery = false
+ }
+ var literals: List[String] = List.empty
+ inQuery.list.foreach {
+ case literal: Literal => literals = literals :+
literal.value.toString
+ case _ => validINQuery = false
+ }
+ if (validINQuery) {
+ Option.apply(inQuery, literals)
+ } else {
+ Option.empty
+ }
+ case _ => Option.empty
+ }
}
/**
* Return true if metadata table is enabled and record index metadata
partition is available.
*/
- private def isIndexAvailable: Boolean = {
+ def isIndexAvailable: Boolean = {
metadataConfig.enabled &&
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
index e3048a0d914..8e235960fba 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
@@ -17,8 +17,15 @@
package org.apache.hudi.functional
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex}
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, GreaterThan, GreaterThanOrEqual, In, Literal, Or}
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -28,7 +35,7 @@ class TestRecordLevelIndexWithSQL extends
RecordLevelIndexTestBase {
val sqlTempTable = "tbl"
@ParameterizedTest
- @ValueSource(strings = Array("COPY_ON_WRITE"))
+ @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testRLIWithSQL(tableType: String): Unit = {
var hudiOpts = commonOpts
hudiOpts = hudiOpts + (
@@ -45,8 +52,103 @@ class TestRecordLevelIndexWithSQL extends
RecordLevelIndexTestBase {
validate = false)
createTempTable(hudiOpts)
- val reckey =
mergedDfList.last.limit(1).collect()(0).getAs("_row_key").toString
- spark.sql("select * from " + sqlTempTable + " where '" + reckey + "' =
_row_key").show(false)
+ verifyInQuery(hudiOpts)
+ verifyEqualToQuery(hudiOpts)
+ verifyNegativeTestCases(hudiOpts)
+ }
+
+ private def verifyNegativeTestCases(hudiOpts: Map[String, String]): Unit = {
+ val commonOpts = hudiOpts + ("path" -> basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+
+ // when no data filter is applied
+ assertEquals(getLatestDataFilesCount(commonOpts),
fileIndex.listFiles(Seq.empty, Seq.empty).flatMap(s => s.files).size)
+ assertEquals(6, spark.sql("select * from " + sqlTempTable).count())
+
+ // non existing entries in EqualTo query
+ var dataFilter: Expression = EqualTo(attribute("_row_key"), Literal("xyz"))
+ assertEquals(0, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ assertEquals(0, fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s
=> s.files).size)
+
+ // non existing entries in IN query
+ dataFilter = In(attribute("_row_key"), List.apply(Literal("xyz"),
Literal("abc")))
+ assertEquals(0, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ assertEquals(0, fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s
=> s.files).size)
+
+ // not supported GreaterThan query
+ val reckey = mergedDfList.last.limit(2).collect().map(row =>
row.getAs("_row_key").toString)
+ dataFilter = GreaterThan(attribute("_row_key"), Literal(reckey(0)))
+ assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s =>
s.files).size >= 3)
+
+ // not supported OR query
+ dataFilter = Or(EqualTo(attribute("_row_key"), Literal(reckey(0))),
GreaterThanOrEqual(attribute("timestamp"), Literal(0)))
+ assertEquals(6, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ assertTrue(fileIndex.listFiles(Seq.empty, Seq(dataFilter)).flatMap(s =>
s.files).size >= 3)
+ }
+
+ def verifyEqualToQuery(hudiOpts: Map[String, String]): Unit = {
+ val reckey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs("_row_key").toString)
+ val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
+ assertEquals(1, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ verifyPruningFileCount(hudiOpts, dataFilter, 1)
+ }
+
+ def verifyInQuery(hudiOpts: Map[String, String]): Unit = {
+ var reckey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs("_row_key").toString)
+ var dataFilter = In(attribute("_row_key"), reckey.map(l =>
literal(l)).toList)
+ assertEquals(1, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ var numFiles = if (isTableMOR()) 2 else 1
+ verifyPruningFileCount(hudiOpts, dataFilter, numFiles)
+
+ reckey = mergedDfList.last.limit(2).collect().map(row =>
row.getAs("_row_key").toString)
+ dataFilter = In(attribute("_row_key"), reckey.map(l => literal(l)).toList)
+ assertEquals(2, spark.sql("select * from " + sqlTempTable + " where " +
dataFilter.sql).count())
+ numFiles = if (isTableMOR()) 2 else 2
+ verifyPruningFileCount(hudiOpts, dataFilter, numFiles)
+ }
+
+ private def attribute(partition: String): AttributeReference = {
+ AttributeReference(partition, StringType, true)()
+ }
+
+ private def literal(value: String): Literal = {
+ Literal.create(value)
+ }
+
+ private def verifyPruningFileCount(opts: Map[String, String], dataFilter:
Expression, numFiles: Int): Unit = {
+ // with data skipping
+ val commonOpts = opts + ("path" -> basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
+ assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+ assertEquals(filteredFilesCount, numFiles)
+
+ // with no data skipping
+ fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ assertTrue(filteredFilesCount < filesCountWithNoSkipping)
+ }
+
+ private def isTableMOR(): Boolean = {
+ metaClient.getTableType == HoodieTableType.MERGE_ON_READ
+ }
+
+ private def getLatestDataFilesCount(opts: Map[String, String],
includeLogFiles: Boolean = true) = {
+ var totalLatestDataFiles = 0L
+
getTableFileSystenView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ totalLatestDataFiles
+ }
+
+ private def getTableFileSystenView(opts: Map[String, String]):
HoodieMetadataFileSystemView = {
+ new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline,
metadataWriter(getWriteConfig(opts)).getTableMetadata)
}
private def createTempTable(hudiOpts: Map[String, String]): Unit = {