This is an automated email from the ASF dual-hosted git repository.
stream2000 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b2aef89a873 [HUDI-7246] Fix Data Skipping Issue: No Results When Query
Conditions Involve Both Columns with and without Column Stats (#10389)
b2aef89a873 is described below
commit b2aef89a8737cf3c8eef379883535ba45f194714
Author: majian <[email protected]>
AuthorDate: Thu Jan 18 20:16:32 2024 +0800
[HUDI-7246] Fix Data Skipping Issue: No Results When Query Conditions
Involve Both Columns with and without Column Stats (#10389)
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 16 +--
.../apache/spark/sql/hudi/DataSkippingUtils.scala | 12 ++-
.../org/apache/hudi/TestDataSkippingUtils.scala | 41 +++++++-
.../spark/sql/hudi/TestDataSkippingQuery.scala | 114 +++++++++++++++++++++
4 files changed, 170 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7a75c6c35ca..0a6cf437a6f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -280,13 +280,17 @@ class ColumnStatsIndexSupport(spark: SparkSession,
acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
case None =>
// NOTE: This could occur in either of the following cases:
- // 1. Particular file does not have this particular
column (which is indexed by Column Stats Index):
- // in this case we're assuming missing column to
essentially contain exclusively
- // null values, we set min/max values as null and
null-count to be equal to value-count (this
- // behavior is consistent with reading non-existent
columns from Parquet)
+ // 1. When certain columns exist in the schema but are
absent in some data files due to
+ // schema evolution or other reasons, these columns
will not be present in the column stats.
+ // In this case, we fill in default values by setting
the min, max and null-count to null
+ // (this behavior is consistent with reading
non-existent columns from Parquet).
+ // 2. When certain columns are present both in the schema
and the data files,
+ // but the column stats are absent for these columns
due to their types not supporting indexing,
+ // we also set these columns to default values.
//
- // This is a way to determine current column's index without
explicit iteration (we're adding 3 stats / column)
- acc ++= Seq(null, null, valueCount)
+ // This approach prevents errors during data skipping and,
because the filter includes an isNull check,
+ // these conditions will not affect the accurate return of
files from data skipping.
+ acc ++= Seq(null, null, null)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index 7cb4a3c5428..cfd8d1351d8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -26,7 +26,7 @@ import
org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute,
AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue,
GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull,
LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
import org.apache.spark.unsafe.types.UTF8String
@@ -211,10 +211,16 @@ object DataSkippingUtils extends Logging {
.map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
// Filter "colA is not null"
- // Translates to "colA_nullCount < colA_valueCount" for index lookup
+ // Translates to "colA_nullCount = null or colA_valueCount = null or
colA_nullCount < colA_valueCount" for index lookup
+ // "colA_nullCount = null or colA_valueCount = null" means we are not
certain whether the column is null or not,
+ // hence we return True to ensure this does not affect the query.
case IsNotNull(attribute: AttributeReference) =>
getTargetIndexedColumnName(attribute, indexSchema)
- .map(colName => LessThan(genColNumNullsExpr(colName),
genColValueCountExpr))
+ .map {colName =>
+ val numNullExpr = genColNumNullsExpr(colName)
+ val valueCountExpr = genColValueCountExpr
+ Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)),
LessThan(numNullExpr, valueCountExpr))
+ }
// Filter "expr(colA) in (B1, B2, ...)"
// Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR
(colA_minValue <= B2 AND colA_maxValue >= B2) ... "
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index f60b95d8f5a..cd1846285ff 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -48,17 +48,17 @@ case class IndexRow(fileName: String,
// Corresponding A column is LongType
A_minValue: Long = -1,
A_maxValue: Long = -1,
- A_nullCount: Long = -1,
+ A_nullCount: java.lang.Long = null,
// Corresponding B column is StringType
B_minValue: String = null,
B_maxValue: String = null,
- B_nullCount: Long = -1,
+ B_nullCount: java.lang.Long = null,
// Corresponding B column is TimestampType
C_minValue: Timestamp = null,
C_maxValue: Timestamp = null,
- C_nullCount: Long = -1) {
+ C_nullCount: java.lang.Long = null) {
def toRow: Row = Row(productIterator.toSeq: _*)
}
@@ -89,7 +89,8 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase
with SparkAdapterS
@MethodSource(Array(
"testBasicLookupFilterExpressionsSource",
"testAdvancedLookupFilterExpressionsSource",
- "testCompositeFilterExpressionsSource"
+ "testCompositeFilterExpressionsSource",
+ "testSupportedAndUnsupportedDataSkippingColumnsSource"
))
def testLookupFilterExpressions(sourceFilterExprStr: String, input:
Seq[IndexRow], expectedOutput: Seq[String]): Unit = {
// We have to fix the timezone to make sure all date-bound utilities output
@@ -197,6 +198,38 @@ object TestDataSkippingUtils {
)
}
+ def testSupportedAndUnsupportedDataSkippingColumnsSource():
java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ arguments(
+ "A = 1 and B is not null",
+ Seq(
+ IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+ ),
+ Seq("file_1", "file_2")
+ ),
+ arguments(
+ "B = 1 and B is not null",
+ Seq(
+ IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+ ),
+ Seq("file_1", "file_2", "file_3")
+ ),
+ arguments(
+ "A = 1 and A is not null and B is not null and B > 2",
+ Seq(
+ IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+ IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3,
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+ ),
+ Seq("file_1", "file_2")
+ )
+ )
+ }
+
def testMiscLookupFilterExpressionsSource():
java.util.stream.Stream[Arguments] = {
// NOTE: Have to use [[Arrays.stream]], as Scala can't resolve properly 2
overloads for [[Stream.of]]
// (for single element)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
new file mode 100644
index 00000000000..1ac7185f642
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestDataSkippingQuery extends HoodieSparkSqlTestBase {
+
+ test("Test the data skipping query involves conditions " +
+ "that cover both columns supported by column stats and those that are not
supported.") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | attributes map<string, string>,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
+ | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
+ | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
+ """.stripMargin)
+ // Check the case where the WHERE condition only includes columns not
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition only includes columns
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ }
+ }
+
+ test("Test data skipping when specifying columns with column stats
support.") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | attributes map<string, string>,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000,
'2021-01-05'),
+ | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000,
'2021-01-06'),
+ | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000,
'2021-01-07')
+ """.stripMargin)
+ // Check the case where the WHERE condition only includes columns not
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition only includes columns
supported by column stats
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check the case where the WHERE condition includes both columns
supported by column stats and those that are not
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red' and name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ // Check WHERE condition that includes both columns with existing column
stats and columns of types
+ // that support column stats but for which column stats do not exist
+ checkAnswer(s"select id, name, price, ts, dt from $tableName where
ts=1000 and name='a1'")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+ }
+ }
+}