This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ba6d17288c3 [SPARK-40169][SQL] Don't pushdown Parquet filters with no 
reference to data schema
ba6d17288c3 is described below

commit ba6d17288c3287e8dc1f7cb95db0233a45732dc0
Author: Chao Sun <sunc...@apple.com>
AuthorDate: Fri Sep 16 10:46:36 2022 -0700

    [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data 
schema
    
    ### What changes were proposed in this pull request?
    
    Currently in Parquet V1 read path, Spark will pushdown data filters even if 
they have no reference in the Parquet read schema. This can cause correctness 
issues as described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).
    
    The root cause, it seems, is because in the V1 path, we first use 
`AttributeReference` equality to filter out data columns without partition 
columns, and then use `AttributeSet` equality to filter out filters with only 
references to data columns.
    There's inconsistency in the two steps, when case sensitive check is false.
    
    Take the following scenario as example:
    - data column: `[COL, a]`
    - partition column: `[col]`
    - filter: `col > 10`
    
    With `AttributeReference` equality, `COL` is not considered equal to `col` 
(because their names are different), and thus the filtered out data column set 
is still `[COL, a]`. However, when calculating filters with only reference to 
data columns, `COL` is **considered equal** to `col`. Consequently, the filter 
`col > 10`, when checking with `[COL, a]`, is considered to have reference to 
data columns, and thus will be pushed down to Parquet as data filter.
    
    On the Parquet side, since `col` doesn't exist in the file schema (it only 
has `COL`), when column index enabled, it will incorrectly return wrong number 
of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) 
for more detail.
    
    In general, where data columns overlap with partition columns and case 
sensitivity is false, partition filters will not be filter out before we 
calculate filters with only reference to data columns, which is incorrect.
    
    ### Why are the changes needed?
    
    This fixes the correctness bug described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    There are existing test cases for this issue from 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also 
modified them to test the scenarios when case sensitivity is on or off.
    
    Closes #37881 from sunchao/SPARK-40169.
    
    Authored-by: Chao Sun <sunc...@apple.com>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../execution/datasources/FileSourceStrategy.scala |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  5 ---
 .../datasources/parquet/ParquetQuerySuite.scala    | 38 ++++++++++++++--------
 3 files changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 9356e46a691..37a04477774 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -184,7 +184,7 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
       // Partition keys are not available in the statistics of the files.
       // `dataColumns` might have partition columns, we need to filter them 
out.
-      val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionColumns.contains)
+      val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionSet.contains)
       val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
         if (f.references.intersect(partitionSet).nonEmpty) {
           extractPredicatesWithinOutputSet(f, 
AttributeSet(dataColumnsWithoutPartitionCols))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2fa0854c983..9765e7c7801 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -230,11 +230,6 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
-    // See PARQUET-2170.
-    // Disable column index optimisation when required schema does not have 
columns that appear in
-    // pushed filters to avoid getting incorrect results.
-    
hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, 
false)
-
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index d0a9a93b00f..4e236ad7865 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -1067,24 +1067,34 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
   }
 
   test("SPARK-39833: pushed filters with count()") {
-    withTempPath { path =>
-      val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
-      Seq(0).toDF("COL").coalesce(1).write.save(p)
-      val df = spark.read.parquet(path.getCanonicalPath)
-      checkAnswer(df.filter("col = 0"), Seq(Row(0)))
-      assert(df.filter("col = 0").count() == 1, "col")
-      assert(df.filter("COL = 0").count() == 1, "COL")
+    Seq(true, false).foreach { caseSensitive =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        withTempPath { path =>
+          val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+          Seq(0).toDF("COL").coalesce(1).write.save(p)
+          val df = spark.read.parquet(path.getCanonicalPath)
+          val expected = if (caseSensitive) Seq(Row(0, 0)) else Seq(Row(0))
+          checkAnswer(df.filter("col = 0"), expected)
+          assert(df.filter("col = 0").count() == 1, "col")
+          assert(df.filter("COL = 0").count() == 1, "COL")
+        }
+      }
     }
   }
 
   test("SPARK-39833: pushed filters with project without filter columns") {
-    withTempPath { path =>
-      val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
-      Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
-      val df = spark.read.parquet(path.getCanonicalPath)
-      checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
-      assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
-      assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == 
Row(1) :: Nil)
+    Seq(true, false).foreach { caseSensitive =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        withTempPath { path =>
+          val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+          Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
+          val df = spark.read.parquet(path.getCanonicalPath)
+          val expected = if (caseSensitive) Seq(Row(0, 1, 0)) else Seq(Row(0, 
1))
+          checkAnswer(df.filter("col = 0"), expected)
+          assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: 
Nil)
+          assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == 
Row(1) :: Nil)
+        }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to