This is an automated email from the ASF dual-hosted git repository.

stream2000 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b2aef89a873 [HUDI-7246] Fix Data Skipping Issue: No Results When Query 
Conditions Involve Both Columns with and without Column Stats (#10389)
b2aef89a873 is described below

commit b2aef89a8737cf3c8eef379883535ba45f194714
Author: majian <[email protected]>
AuthorDate: Thu Jan 18 20:16:32 2024 +0800

    [HUDI-7246] Fix Data Skipping Issue: No Results When Query Conditions 
Involve Both Columns with and without Column Stats (#10389)
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  16 +--
 .../apache/spark/sql/hudi/DataSkippingUtils.scala  |  12 ++-
 .../org/apache/hudi/TestDataSkippingUtils.scala    |  41 +++++++-
 .../spark/sql/hudi/TestDataSkippingQuery.scala     | 114 +++++++++++++++++++++
 4 files changed, 170 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7a75c6c35ca..0a6cf437a6f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -280,13 +280,17 @@ class ColumnStatsIndexSupport(spark: SparkSession,
                   acc ++= Seq(colStatRecord.getMinValue, 
colStatRecord.getMaxValue, colStatRecord.getNullCount)
                 case None =>
                   // NOTE: This could occur in either of the following cases:
-                  //    1. Particular file does not have this particular 
column (which is indexed by Column Stats Index):
-                  //       in this case we're assuming missing column to 
essentially contain exclusively
-                  //       null values, we set min/max values as null and 
null-count to be equal to value-count (this
-                  //       behavior is consistent with reading non-existent 
columns from Parquet)
+                  //    1. When certain columns exist in the schema but are 
absent in some data files due to
+                  //       schema evolution or other reasons, these columns 
will not be present in the column stats.
+                  //       In this case, we fill in default values by setting 
the min, max and null-count to null
+                  //       (this behavior is consistent with reading 
non-existent columns from Parquet).
+                  //    2. When certain columns are present both in the schema 
and the data files,
+                  //       but the column stats are absent for these columns 
due to their types not supporting indexing,
+                  //       we also set these columns to default values.
                   //
-                  // This is a way to determine current column's index without 
explicit iteration (we're adding 3 stats / column)
-                  acc ++= Seq(null, null, valueCount)
+                  // This approach prevents errors during data skipping and, 
because the filter includes an isNull check,
+                  // these conditions will not affect the accurate return of 
files from data skipping.
+                  acc ++= Seq(null, null, null)
               }
           }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index 7cb4a3c5428..cfd8d1351d8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, 
AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, 
GetStructField, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, 
LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith, SubqueryExpression}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -211,10 +211,16 @@ object DataSkippingUtils extends Logging {
           .map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
 
       // Filter "colA is not null"
-      // Translates to "colA_nullCount < colA_valueCount" for index lookup
+      // Translates to "colA_nullCount = null or colA_valueCount = null or 
colA_nullCount < colA_valueCount" for index lookup
+      // "colA_nullCount = null or colA_valueCount = null" means we are not 
certain whether the column is null or not,
+      // hence we return True to ensure this does not affect the query.
       case IsNotNull(attribute: AttributeReference) =>
         getTargetIndexedColumnName(attribute, indexSchema)
-          .map(colName => LessThan(genColNumNullsExpr(colName), 
genColValueCountExpr))
+          .map {colName =>
+            val numNullExpr = genColNumNullsExpr(colName)
+            val valueCountExpr = genColValueCountExpr
+            Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)), 
LessThan(numNullExpr, valueCountExpr))
+          }
 
       // Filter "expr(colA) in (B1, B2, ...)"
       // Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR 
(colA_minValue <= B2 AND colA_maxValue >= B2) ... "
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index f60b95d8f5a..cd1846285ff 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -48,17 +48,17 @@ case class IndexRow(fileName: String,
                     // Corresponding A column is LongType
                     A_minValue: Long = -1,
                     A_maxValue: Long = -1,
-                    A_nullCount: Long = -1,
+                    A_nullCount: java.lang.Long = null,
 
                     // Corresponding B column is StringType
                     B_minValue: String = null,
                     B_maxValue: String = null,
-                    B_nullCount: Long = -1,
+                    B_nullCount: java.lang.Long = null,
 
                     // Corresponding B column is TimestampType
                     C_minValue: Timestamp = null,
                     C_maxValue: Timestamp = null,
-                    C_nullCount: Long = -1) {
+                    C_nullCount: java.lang.Long = null) {
   def toRow: Row = Row(productIterator.toSeq: _*)
 }
 
@@ -89,7 +89,8 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase 
with SparkAdapterS
   @MethodSource(Array(
     "testBasicLookupFilterExpressionsSource",
     "testAdvancedLookupFilterExpressionsSource",
-    "testCompositeFilterExpressionsSource"
+    "testCompositeFilterExpressionsSource",
+    "testSupportedAndUnsupportedDataSkippingColumnsSource"
   ))
   def testLookupFilterExpressions(sourceFilterExprStr: String, input: 
Seq[IndexRow], expectedOutput: Seq[String]): Unit = {
     // We have to fix the timezone to make sure all date-bound utilities output
@@ -197,6 +198,38 @@ object TestDataSkippingUtils {
     )
   }
 
+  def testSupportedAndUnsupportedDataSkippingColumnsSource(): 
java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      arguments(
+        "A = 1 and B is not null",
+        Seq(
+          IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+        ),
+        Seq("file_1", "file_2")
+      ),
+      arguments(
+        "B = 1 and B is not null",
+        Seq(
+          IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+        ),
+        Seq("file_1", "file_2", "file_3")
+      ),
+      arguments(
+        "A = 1 and A is not null and B is not null and B > 2",
+        Seq(
+          IndexRow("file_1", valueCount = 2, A_minValue = 0, A_maxValue = 1, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_2", valueCount = 2, A_minValue = 1, A_maxValue = 2, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null),
+          IndexRow("file_3", valueCount = 2, A_minValue = 2, A_maxValue = 3, 
A_nullCount = 0, B_minValue = null, B_maxValue = null, B_nullCount = null)
+        ),
+        Seq("file_1", "file_2")
+      )
+    )
+  }
+
   def testMiscLookupFilterExpressionsSource(): 
java.util.stream.Stream[Arguments] = {
     // NOTE: Have to use [[Arrays.stream]], as Scala can't resolve properly 2 
overloads for [[Stream.of]]
     //       (for single element)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
new file mode 100644
index 00000000000..1ac7185f642
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDataSkippingQuery.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestDataSkippingQuery extends HoodieSparkSqlTestBase {
+
+  test("Test the data skipping query involves conditions " +
+    "that cover both columns supported by column stats and those that are not 
supported.") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql("set hoodie.metadata.enable = true")
+      spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+      spark.sql("set hoodie.enable.data.skipping = true")
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  attributes map<string, string>,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | tblproperties (primaryKey = 'id')
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+                  """.stripMargin)
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
+           | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
+           | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
+                  """.stripMargin)
+      // Check the case where the WHERE condition only includes columns not 
supported by column stats
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+      // Check the case where the WHERE condition only includes columns 
supported by column stats
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+      // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+    }
+  }
+
+  test("Test data skipping when specifying columns with column stats 
support.") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql("set hoodie.metadata.enable = true")
+      spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+      spark.sql("set hoodie.enable.data.skipping = true")
+      spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  attributes map<string, string>,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | tblproperties (primaryKey = 'id')
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+                  """.stripMargin)
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
+           | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
+           | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
+                  """.stripMargin)
+      // Check the case where the WHERE condition only includes columns not 
supported by column stats
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+      // Check the case where the WHERE condition only includes columns 
supported by column stats
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+      // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+      // Check WHERE condition that includes both columns with existing column 
stats and columns of types
+      // that support column stats but for which column stats do not exist
+      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
ts=1000 and name='a1'")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05")
+      )
+    }
+  }
+}

Reply via email to