codope commented on code in PR #12575:
URL: https://github.com/apache/hudi/pull/12575#discussion_r1904048424
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala:
##########
@@ -109,137 +127,187 @@ object DataSkippingUtils extends Logging {
// corresponding column in the Column Stats Index
val targetExprBuilder: Expression => Expression =
swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, valueExpr,
targetExprBuilder)
- }
+ }.orElse({
+ hasNonIndexedCols.set(true)
Review Comment:
should we do `compareAndSet` instead of simply `set`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -96,7 +97,11 @@ class PartitionStatsIndexSupport(spark: SparkSession,
// column in a filter does not have the stats available, by
making sure such a
// filter does not prune any partition.
val indexSchema = transposedPartitionStatsDF.schema
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
+ val indexedCols : Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ // to be fixed. Siva.
Review Comment:
let's create a JIRA if you plan to fix it in a separate PR
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala:
##########
@@ -42,28 +45,43 @@ object DataSkippingUtils extends Logging {
* @param isExpressionIndex whether the index is an expression index
* @return filter for column-stats index's table
*/
- def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = {
+ def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
isExpressionIndex: Boolean = false,
+ indexedCols : Seq[String] =
Seq.empty,
+ hasNonIndexedCols :
AtomicBoolean = new AtomicBoolean(false)): Expression = {
try {
- createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
indexSchema, isExpressionIndex)
+ createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
isExpressionIndex, indexedCols,
+ hasNonIndexedCols)
} catch {
case e: AnalysisException =>
logDebug(s"Failed to translated provided data table filter expr into
column stats one ($dataTableFilterExpr)", e)
throw e
}
}
- private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr:
Expression, indexSchema: StructType, isExpressionIndex: Boolean = false):
Expression = {
+ private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr:
Expression, isExpressionIndex: Boolean = false,
+ indexedCols :
Seq[String],
+ hasNonIndexedCols :
AtomicBoolean = new AtomicBoolean(false)): Expression = {
// Try to transform original Source Table's filter expression into
// Column-Stats Index filter expression
- tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema,
isExpressionIndex) match {
+ tryComposeIndexFilterExpr(dataTableFilterExpr, isExpressionIndex,
indexedCols, hasNonIndexedCols) match {
case Some(e) => e
// NOTE: In case we can't transform source filter expression, we fallback
// to {@code TrueLiteral}, to essentially avoid pruning any indexed
files from scanning
case None => TrueLiteral
}
}
- private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression,
indexSchema: StructType, isExpressionIndex: Boolean = false):
Option[Expression] = {
+ /**
+ * Composes index filter expression to be looked up with col stats index in
MDT.
+ * For eg, a filter from source as "colA = 'abc'" will get transformed to
"colA_minValue <= 'abc' and colA_maxValue >= 'abc'"
+ * @param sourceFilterExpr source filter expression of interest.
+ * @param isExpressionIndex true if this refers to an expression index.
+ * @param indexedCols list of columns indexed with col stats index in MDT.
+ * @param hasNonIndexedCols atomic boolean tracking if there are any non
indexed columns.
+ * @return optionally transformed Expression. Returns None if column of
interest it not indexed nor translatable.
+ */
+ private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression,
isExpressionIndex: Boolean = false,
+ indexedCols : Seq[String],
hasNonIndexedCols : AtomicBoolean = new AtomicBoolean(false)):
Option[Expression] = {
Review Comment:
why change `indexSchema` to `indexedCols`? We don't use schema downstream at
all, is it? Just calling out, i hope this works for nested fields as well.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala:
##########
@@ -270,35 +344,63 @@ object DataSkippingUtils extends Logging {
// lexicographically, we essentially need to check that provided
literal falls w/in min/max bounds of the
// given column
case StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), v
@ Literal(_: UTF8String, _)) =>
- getTargetIndexedColumnName(attrRef, indexSchema)
+ getTargetIndexedColumnName(attrRef, indexedCols)
.map { colName =>
val targetExprBuilder: Expression => Expression =
swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, v, targetExprBuilder)
- }
+ }.orElse({
+ hasNonIndexedCols.set(true)
+ Option.empty
+ })
// Filter "expr(colA) not like 'xxx%'"
// Translates to "NOT(expr(colA_minValue) like 'xxx%' AND
expr(colA_maxValue) like 'xxx%')" for index lookup
// NOTE: This is NOT an inversion of "colA like xxx"
case Not(StartsWith(sourceExpr @
AllowedTransformationExpression(attrRef), value @ Literal(_: UTF8String, _))) =>
- getTargetIndexedColumnName(attrRef, indexSchema)
+ getTargetIndexedColumnName(attrRef, indexedCols)
.map { colName =>
val targetExprBuilder: Expression => Expression =
swapAttributeRefInExpr(sourceExpr, attrRef, _)
val minValueExpr =
targetExprBuilder.apply(genColMinValueExpr(colName))
val maxValueExpr =
targetExprBuilder.apply(genColMaxValueExpr(colName))
Not(And(StartsWith(minValueExpr, value), StartsWith(maxValueExpr,
value)))
- }
+ }.orElse({
+ hasNonIndexedCols.set(true)
+ Option.empty
+ })
case or: Or =>
- val resLeft = createColumnStatsIndexFilterExprInternal(or.left,
indexSchema)
- val resRight = createColumnStatsIndexFilterExprInternal(or.right,
indexSchema)
-
- Option(Or(resLeft, resRight))
+ val leftHasNonIndexedCols = new AtomicBoolean(false)
+ val resLeft = createColumnStatsIndexFilterExprInternal(or.left,
isExpressionIndex = isExpressionIndex,
+ indexedCols = indexedCols, hasNonIndexedCols = leftHasNonIndexedCols)
+ val rightHasNonIndexedCols = new AtomicBoolean(false)
+ val resRight = createColumnStatsIndexFilterExprInternal(or.right,
isExpressionIndex = isExpressionIndex,
+ indexedCols = indexedCols, hasNonIndexedCols =
rightHasNonIndexedCols)
+ if (leftHasNonIndexedCols.get() || rightHasNonIndexedCols.get()) {
+ hasNonIndexedCols.set(true)
+ None
+ } else {
+ Option(Or(resLeft, resRight))
+ }
case and: And =>
- val resLeft = createColumnStatsIndexFilterExprInternal(and.left,
indexSchema)
- val resRight = createColumnStatsIndexFilterExprInternal(and.right,
indexSchema)
-
- Option(And(resLeft, resRight))
+ val leftHasNonIndexedCols = new AtomicBoolean(false)
+ val resLeft = createColumnStatsIndexFilterExprInternal(and.left,
isExpressionIndex = isExpressionIndex,
+ indexedCols = indexedCols, hasNonIndexedCols = leftHasNonIndexedCols)
+ val rightHasNonIndexedCols = new AtomicBoolean(false)
+ val resRight = createColumnStatsIndexFilterExprInternal(and.right,
isExpressionIndex = isExpressionIndex,
+ indexedCols = indexedCols, hasNonIndexedCols =
rightHasNonIndexedCols)
+ // only if both left and right has non indexed cols, we can set
hasNonIndexedCols to true.
+ // If not, we can still afford to prune files based on col stats
lookup.
+ if (leftHasNonIndexedCols.get() && !rightHasNonIndexedCols.get()) {
+ Option(resRight) // if only left has non indexed cols, ignore from
the expression to be looked up in col stats df
+ } else if (!leftHasNonIndexedCols.get() &&
rightHasNonIndexedCols.get()) {
+ Option(resLeft) // if only right has non indexed cols, ignore from
the expression to be looked up in col stats df
+ } else if (leftHasNonIndexedCols.get() &&
rightHasNonIndexedCols.get()) {
+ hasNonIndexedCols.set(true)
+ None
+ } else {
+ Option(And(resLeft, resRight))
+ }
Review Comment:
Can be slightly simplified (less atomic boolean checks):
```
if (leftHasNonIndexedCols.get() && rightHasNonIndexedCols.get()) {
hasNonIndexedCols.set(true)
Option.empty
} else if (leftHasNonIndexedCols.get()) {
Option(resRight) // Ignore the non-indexed left expression
} else if (rightHasNonIndexedCols.get()) {
Option(resLeft) // Ignore the non-indexed right expression
} else {
Option(And(resLeft, resRight))
}
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala:
##########
@@ -94,6 +97,162 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true,
useShortSchema = true,
validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue",
"c2_minValue", "c3_maxValue", "c3_minValue"))
+
+ // predicate with c2. should prune based on col stats lookup
+ var dataFilter: Expression = EqualTo(attribute("c2"), literal("619sdc"))
+ verifyPruningFileCount(commonOpts, dataFilter)
+ // predicate w/ c5. should not lookup in col stats since the column is not
indexed.
+ var dataFilter1: Expression = GreaterThan(attribute("c5"), literal("70"))
+ verifyPruningFileCount(commonOpts, dataFilter1, false)
+
+ // a mix of two cols, where c2 is indexed and c5 is not indexed. but since
its 'AND', pruning should kick in.
+ var dataFilter2 = And(dataFilter1, EqualTo(attribute("c2"),
literal("619sdc")))
+ verifyPruningFileCount(commonOpts, dataFilter2, true)
+ // adding an AND clause
+ dataFilter2 = And(dataFilter2, EqualTo(attribute("c5"), literal("100")))
+ verifyPruningFileCount(commonOpts, dataFilter2, true)
+ // adding an OR clause where the col is indexed. expected to prune
+ var dataFilter2_1 = Or(dataFilter2, EqualTo(attribute("c2"),
literal("619sda")))
+ verifyPruningFileCount(commonOpts, dataFilter2_1, true)
+ // adding another Or clause, but this time the col is not indexed. So, no
pruning expected.
+ dataFilter2_1 = Or(dataFilter2_1, EqualTo(attribute("c5"), literal("120")))
+ verifyPruningFileCount(commonOpts, dataFilter2_1, false)
+
+ // a mix of two cols, where c2 is indexed and c5 is not indexed. but since
its 'OR', pruning should be by passed.
+ var dataFilter3 = Or(dataFilter1, EqualTo(attribute("c2"),
literal("619sdc")))
+ verifyPruningFileCount(commonOpts, dataFilter3, false)
+ // adding an OR clause
+ dataFilter3 = Or(dataFilter3, EqualTo(attribute("c5"), literal("100")))
+ verifyPruningFileCount(commonOpts, dataFilter3, false)
+ // adding AND clause where the col is indexed. Expected to prune.
+ var dataFilter3_1 = And(dataFilter3, EqualTo(attribute("c2"),
literal("619sda")))
+ verifyPruningFileCount(commonOpts, dataFilter3_1, true)
+ // adding another AND clause where the col is not indexed. Still expected
to prune since c2 = 619sda could still be pruned.
+ dataFilter3_1 = And(dataFilter3_1, EqualTo(attribute("c5"),
literal("200")))
+ verifyPruningFileCount(commonOpts, dataFilter3_1, true)
+ // adding an Or clause where the col is indexed. expected to prune.
+ var dataFilter3_2 = Or(dataFilter3_1, EqualTo(attribute("c2"),
literal("619sda")))
+ verifyPruningFileCount(commonOpts, dataFilter3_2, true)
+ // adding an Or clause where the col is not indexed. not expected to prune
+ dataFilter3_2 = Or(dataFilter3_2, EqualTo(attribute("c5"), literal("250")))
+ verifyPruningFileCount(commonOpts, dataFilter3_2, false)
+ }
+
+ @Test
+ def testTranslateIntoColumnStatsIndexFilterExpr(): Unit = {
Review Comment:
Pretty good coverage! I have some more test cases:
1. Concurrent translation to exercise AtomicBoolean
2. Nested And/Or example
```
val dataFilter = And(
Or(
EqualTo(attribute("c1"), literal("619sdc")),
And(EqualTo(attribute("c2"), literal("100")), EqualTo(attribute("c3"),
literal("200")))
),
EqualTo(attribute("c4"), literal("300"))
)
translateIntoColStatsExprAndValidate(
dataFilter,
Seq("c1", "c2", "c3", "c4"),
expectedTransformedExpression, // Define the correct transformed expression
false
)
```
3. Index updates
```
val dataFilter = EqualTo(attribute("c1"), literal("619sdc"))
translateIntoColStatsExprAndValidate(dataFilter, Seq("c1"), expectedExpr,
false)
// Simulate removing c1 from indexed columns
translateIntoColStatsExprAndValidate(dataFilter, Seq.empty, TrueLiteral,
true)
```
4. Unsupported filter types
```
val unsupportedFilter = SubqueryExpression(...) // Simulate a subquery-based
filter
translateIntoColStatsExprAndValidate(unsupportedFilter, Seq("c1"),
TrueLiteral, true)
```
5. Too many filters
```
val largeFilter = (1 to 100).map(i => EqualTo(attribute(s"c$i"),
literal("value"))).reduce(And)
val indexedColumns = (1 to 50).map(i => s"c$i")
translateIntoColStatsExprAndValidate(largeFilter, indexedColumns,
expectedExpr, false)
```
6. Nested schema
```
val schema = StructType(Seq(
StructField("c1", StringType, true),
StructField("nested", StructType(Seq(
StructField("inner_c2", StringType, true),
StructField("inner_c3", IntegerType, true)
)), true)
))
val dataFilter = And(
EqualTo(attribute("c1"), literal("val1")),
EqualTo(attribute("nested.inner_c2"), literal("val2"))
)
translateIntoColStatsExprAndValidate(
dataFilter,
Seq("c1", "nested.inner_c2"),
expectedExpr,
false
)
```
Some of the above might not be handled even from before (e.g. I think case 3
was never handled). Feel free to track separately if it takes longer to fix. I
just thought these would be good addition to the suite for future.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala:
##########
@@ -406,7 +565,7 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
verifySQLQueries(numRecordsForFirstQuery, numRecordsForSecondQuery,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts,
isTableDataSameAsAfterSecondInstant)
commonOpts = commonOpts +
(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key -> "true")
// TODO: https://issues.apache.org/jira/browse/HUDI-6657 - Investigate why
below assertions fail with full table scan enabled.
- //verifySQLQueries(numRecordsForFirstQuery,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts,
isTableDataSameAsAfterSecondInstant)
+ // verifySQLQueries(numRecordsForFirstQuery,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts,
isTableDataSameAsAfterSecondInstant)
Review Comment:
why is this commented?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala:
##########
@@ -42,28 +45,43 @@ object DataSkippingUtils extends Logging {
* @param isExpressionIndex whether the index is an expression index
* @return filter for column-stats index's table
*/
- def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = {
+ def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
isExpressionIndex: Boolean = false,
+ indexedCols : Seq[String] =
Seq.empty,
+ hasNonIndexedCols :
AtomicBoolean = new AtomicBoolean(false)): Expression = {
try {
- createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
indexSchema, isExpressionIndex)
+ createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
isExpressionIndex, indexedCols,
+ hasNonIndexedCols)
Review Comment:
codestyle nit: please keep all arguments in one line unless linebreak limit
is reached.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]