This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new f98aee4 [SPARK-26709][SQL][BRANCH-2.3] OptimizeMetadataOnlyQuery does not handle empty records correctly f98aee4 is described below commit f98aee4d63ca4a51fb7d98e1d36f8a82d62cf378 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Sat Jan 26 09:26:12 2019 +0900 [SPARK-26709][SQL][BRANCH-2.3] OptimizeMetadataOnlyQuery does not handle empty records correctly ## What changes were proposed in this pull request? When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results: ``` sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") sql("SELECT MAX(p1) FROM t") ``` The result is supposed to be `null`. However, with the optimization the result is `5`. The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem. It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default. ## How was this patch tested? Unit test Closes #23648 from gengliangwang/SPARK-26709. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- docs/sql-programming-guide.md | 12 ----------- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++++-- .../sql/execution/OptimizeMetadataOnlyQuery.scala | 5 +++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 25 ++++++++++++++++++++++ .../spark/sql/hive/execution/SQLQuerySuite.scala | 17 +++++++++++++++ 5 files changed, 51 insertions(+), 14 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index e5fa4c6..038c1ec 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -990,18 +990,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession </p> </td> </tr> -<tr> - <td><code>spark.sql.optimizer.metadataOnly</code></td> - <td>true</td> - <td> - <p> - When true, enable the metadata-only query optimization that use the table's metadata to - produce the partition columns instead of table scans. It applies when all the columns scanned - are partition columns and the query has an aggregate operator that satisfies distinct - semantics. - </p> - </td> -</tr> </table> ## ORC Files diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 731d4e3..c77c4f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -469,12 +469,14 @@ object SQLConf { .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString) val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") + .internal() .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + "scanned are partition columns and the query has an aggregate operator that satisfies " + - "distinct semantics.") + "distinct semantics. By default the optimization is disabled, since it may return " + + "incorrect results when the files are empty.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord") .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index dc4aff9..fff32c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -67,6 +67,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic }) } if (isAllDistinctAgg) { + logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " + + "Spark will scan partition-level metadata without scanning data files. " + + "This could result in wrong results when the partition metadata exists but the " + + "inclusive data files are empty." + ) a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) } else { a diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6848b66..7c4703f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2869,6 +2869,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + withTempPath { path => + val tabLocation = path.getCanonicalPath + val partLocation1 = tabLocation + "/p=3" + val partLocation2 = tabLocation + "/p=1" + // SPARK-23271 empty RDD when saved should write a metadata only file + val df = spark.range(10).filter($"id" < 0).toDF("col") + df.write.parquet(partLocation1) + val df2 = spark.range(10).toDF("col") + df2.write.parquet(partLocation2) + val readDF = spark.read.parquet(tabLocation) + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(readDF.selectExpr("max(p)"), Row(3)) + } else { + checkAnswer(readDF.selectExpr("max(p)"), Row(1)) + } + checkAnswer(readDF.selectExpr("max(col)"), Row(9)) + } + } + } + } } case class Foo(bar: Option[String]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 081d854..d11270e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2185,4 +2185,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { + withTable("t") { + sql("CREATE TABLE t (col1 INT) PARTITIONED BY (p1 INT)") + sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") + if (enableOptimizeMetadataOnlyQuery) { + // The result is wrong if we enable the configuration. + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5)) + } else { + checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null)) + } + checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null)) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org