Repository: spark
Updated Branches:
refs/heads/branch-2.3 07ec75ca0 -> 4a10df0f6
[SPARK-24085][SQL] Query returns UnsupportedOperationException when scalar
subquery is present in partitioning expression
## What changes were proposed in this pull request?
In this case, the partition pruning happens before the planning phase of scalar
subquery expressions.
For scalar subquery expressions, the planning occurs late in the cycle (after
the physical planning) in "PlanSubqueries" just before execution. Currently we
try to execute the scalar subquery expression as part of partition pruning and
fail as it implements Unevaluable.
The fix attempts to ignore the Subquery expressions from partition pruning
computation. Another option can be to somehow plan the subqueries before the
partition pruning. Since this may not be a commonly occuring expression, i am
opting for a simpler fix.
Repro
``` SQL
CREATE TABLE test_prc_bug (
id_value string
)
partitioned by (id_type string)
location '/tmp/test_prc_bug'
stored as parquet;
insert into test_prc_bug values ('1','a');
insert into test_prc_bug values ('2','a');
insert into test_prc_bug values ('3','b');
insert into test_prc_bug values ('4','b');
select * from test_prc_bug
where id_type = (select 'b');
```
## How was this patch tested?
Added test in SubquerySuite and hive/SQLQuerySuite
Author: Dilip Biswal <[email protected]>
Closes #21174 from dilipbiswal/spark-24085.
(cherry picked from commit 3fd297af6dc568357c97abf86760c570309d6597)
Signed-off-by: gatorsmile <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a10df0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a10df0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a10df0f
Branch: refs/heads/branch-2.3
Commit: 4a10df0f66f74ec2c995f9832d1ab74112bdeb16
Parents: 07ec75c
Author: Dilip Biswal <[email protected]>
Authored: Fri Apr 27 11:43:29 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Fri Apr 27 11:43:39 2018 -0700
----------------------------------------------------------------------
.../datasources/FileSourceStrategy.scala | 5 +++-
.../datasources/PruneFileSourcePartitions.scala | 4 ++-
.../org/apache/spark/sql/SubquerySuite.scala | 15 ++++++++++
.../sql/hive/execution/SQLQuerySuite.scala | 31 ++++++++++++++++++++
4 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 16b2271..0a568d6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -76,7 +76,10 @@ object FileSourceStrategy extends Strategy with Logging {
fsRelation.partitionSchema,
fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
-
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters
+ .filterNot(SubqueryExpression.hasSubquery(_))
+ .filter(_.references.subsetOf(partitionSet)))
+
logInfo(s"Pruning directories with:
${partitionKeyFilters.mkString(",")}")
val dataColumns =
http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 3b830ac..16b2367 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -55,7 +55,9 @@ private[sql] object PruneFileSourcePartitions extends
Rule[LogicalPlan] {
partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
-
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+ ExpressionSet(normalizedFilters
+ .filterNot(SubqueryExpression.hasSubquery(_))
+ .filter(_.references.subsetOf(partitionSet)))
if (partitionKeyFilters.nonEmpty) {
val prunedFileIndex =
catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 31e8b0e..acef62d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -955,4 +955,19 @@ class SubquerySuite extends QueryTest with
SharedSQLContext {
// before the fix this would throw AnalysisException
spark.range(10).where("(id,id) in (select id, null from range(3))").count
}
+
+ test("SPARK-24085 scalar subquery in partitioning expression") {
+ withTable("parquet_part") {
+ Seq("1" -> "a", "2" -> "a", "3" -> "b", "4" -> "b")
+ .toDF("id_value", "id_type")
+ .write
+ .mode(SaveMode.Overwrite)
+ .partitionBy("id_type")
+ .format("parquet")
+ .saveAsTable("parquet_part")
+ checkAnswer(
+ sql("SELECT * FROM parquet_part WHERE id_type = (SELECT 'b')"),
+ Row("3", "b") :: Row("4", "b") :: Nil)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5e14f53..081d854 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2154,4 +2154,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils
with TestHiveSingleton {
}
}
}
+
+ test("SPARK-24085 scalar subquery in partitioning expression") {
+ Seq("orc", "parquet").foreach { format =>
+ Seq(true, false).foreach { isConverted =>
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted",
+ "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTable(format) {
+ withTempPath { tempDir =>
+ sql(
+ s"""
+ |CREATE TABLE ${format} (id_value string)
+ |PARTITIONED BY (id_type string)
+ |LOCATION '${tempDir.toURI}'
+ |STORED AS ${format}
+ """.stripMargin)
+ sql(s"insert into $format values ('1','a')")
+ sql(s"insert into $format values ('2','a')")
+ sql(s"insert into $format values ('3','b')")
+ sql(s"insert into $format values ('4','b')")
+ checkAnswer(
+ sql(s"SELECT * FROM $format WHERE id_type = (SELECT 'b')"),
+ Row("3", "b") :: Row("4", "b") :: Nil)
+ }
+ }
+ }
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]