This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 7c69614f067 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns 7c69614f067 is described below commit 7c69614f067c9eb68d997e8881d9b5845cde00fd Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Sun Aug 21 18:59:48 2022 +0900 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns ### What changes were proposed in this pull request? This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap. This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields. The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0. In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter. Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170. The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`. ### Why are the changes needed? Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1. Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix. Closes #37419 from sadikovi/SPARK-39833. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit cde71aaf173aadd14dd6393b09e9851b5caad903) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../datasources/parquet/ParquetFileFormat.scala | 5 +++++ .../datasources/parquet/ParquetQuerySuite.scala | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9765e7c7801..2fa0854c983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -230,6 +230,11 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + // See PARQUET-2170. + // Disable column index optimisation when required schema does not have columns that appear in + // pushed filters to avoid getting incorrect results. + hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 33656c84c88..d0a9a93b00f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -1065,6 +1065,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { } } } + + test("SPARK-39833: pushed filters with count()") { + withTempPath { path => + val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}" + Seq(0).toDF("COL").coalesce(1).write.save(p) + val df = spark.read.parquet(path.getCanonicalPath) + checkAnswer(df.filter("col = 0"), Seq(Row(0))) + assert(df.filter("col = 0").count() == 1, "col") + assert(df.filter("COL = 0").count() == 1, "COL") + } + } + + test("SPARK-39833: pushed filters with project without filter columns") { + withTempPath { path => + val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}" + Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p) + val df = spark.read.parquet(path.getCanonicalPath) + checkAnswer(df.filter("col = 0"), Seq(Row(0, 1))) + assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil) + assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil) + } + } } class ParquetV2QuerySuite extends ParquetQuerySuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org