Repository: spark
Updated Branches:
refs/heads/master 2824f12b8 -> 3fd297af6
[SPARK-24085][SQL] Query returns UnsupportedOperationException when scalar
subquery is present in partitioning expression
## What changes were proposed in this pull request?
In this case, the partition pruning happens before the planning phase of scalar
subquery expressions.
For scalar subquery expressions, the planning occurs late in the cycle (after
the physical planning) in "PlanSubqueries" just before execution. Currently we
try to execute the scalar subquery expression as part of partition pruning and
fail as it implements Unevaluable.
The fix attempts to ignore the Subquery expressions from partition pruning
computation. Another option can be to somehow plan the subqueries before the
partition pruning. Since this may not be a commonly occuring expression, i am
opting for a simpler fix.
Repro
``` SQL
CREATE TABLE test_prc_bug (
id_value string
)
partitioned by (id_type string)
location '/tmp/test_prc_bug'
stored as parquet;
insert into test_prc_bug values ('1','a');
insert into test_prc_bug values ('2','a');
insert into test_prc_bug values ('3','b');
insert into test_prc_bug values ('4','b');
select * from test_prc_bug
where id_type = (select 'b');
```
## How was this patch tested?
Added test in SubquerySuite and hive/SQLQuerySuite
Author: Dilip Biswal <[email protected]>
Closes #21174 from dilipbiswal/spark-24085.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fd297af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fd297af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fd297af
Branch: refs/heads/master
Commit: 3fd297af6dc568357c97abf86760c570309d6597
Parents: 2824f12
Author: Dilip Biswal <[email protected]>
Authored: Fri Apr 27 11:43:29 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Fri Apr 27 11:43:29 2018 -0700
----------------------------------------------------------------------
.../datasources/FileSourceStrategy.scala | 5 +++-
.../datasources/PruneFileSourcePartitions.scala | 4 ++-
.../org/apache/spark/sql/SubquerySuite.scala | 15 ++++++++++
.../sql/hive/execution/SQLQuerySuite.scala | 31 ++++++++++++++++++++
4 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3fd297af/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 16b2271..0a568d6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -76,7 +76,10 @@ object FileSourceStrategy extends Strategy with Logging {
fsRelation.partitionSchema,
fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
-
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters
+ .filterNot(SubqueryExpression.hasSubquery(_))
+ .filter(_.references.subsetOf(partitionSet)))
+
logInfo(s"Pruning directories with:
${partitionKeyFilters.mkString(",")}")
val dataColumns =
http://git-wip-us.apache.org/repos/asf/spark/blob/3fd297af/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 3b830ac..16b2367 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -55,7 +55,9 @@ private[sql] object PruneFileSourcePartitions extends
Rule[LogicalPlan] {
partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
-
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters
+ .filterNot(SubqueryExpression.hasSubquery(_))
+ .filter(_.references.subsetOf(partitionSet)))
if (partitionKeyFilters.nonEmpty) {
val prunedFileIndex =
catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
http://git-wip-us.apache.org/repos/asf/spark/blob/3fd297af/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 31e8b0e..acef62d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -955,4 +955,19 @@ class SubquerySuite extends QueryTest with
SharedSQLContext {
// before the fix this would throw AnalysisException
spark.range(10).where("(id,id) in (select id, null from range(3))").count
}
+
+ test("SPARK-24085 scalar subquery in partitioning expression") {
+ withTable("parquet_part") {
+ Seq("1" -> "a", "2" -> "a", "3" -> "b", "4" -> "b")
+ .toDF("id_value", "id_type")
+ .write
+ .mode(SaveMode.Overwrite)
+ .partitionBy("id_type")
+ .format("parquet")
+ .saveAsTable("parquet_part")
+ checkAnswer(
+ sql("SELECT * FROM parquet_part WHERE id_type = (SELECT 'b')"),
+ Row("3", "b") :: Row("4", "b") :: Nil)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3fd297af/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 73f83d5..704a410 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2156,4 +2156,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils
with TestHiveSingleton {
}
}
}
+
+ test("SPARK-24085 scalar subquery in partitioning expression") {
+ Seq("orc", "parquet").foreach { format =>
+ Seq(true, false).foreach { isConverted =>
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted",
+ "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTable(format) {
+ withTempPath { tempDir =>
+ sql(
+ s"""
+ |CREATE TABLE ${format} (id_value string)
+ |PARTITIONED BY (id_type string)
+ |LOCATION '${tempDir.toURI}'
+ |STORED AS ${format}
+ """.stripMargin)
+ sql(s"insert into $format values ('1','a')")
+ sql(s"insert into $format values ('2','a')")
+ sql(s"insert into $format values ('3','b')")
+ sql(s"insert into $format values ('4','b')")
+ checkAnswer(
+ sql(s"SELECT * FROM $format WHERE id_type = (SELECT 'b')"),
+ Row("3", "b") :: Row("4", "b") :: Nil)
+ }
+ }
+ }
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]