Repository: spark
Updated Branches:
  refs/heads/branch-2.3 07ec75ca0 -> 4a10df0f6


[SPARK-24085][SQL] Query returns UnsupportedOperationException when scalar 
subquery is present in partitioning expression

## What changes were proposed in this pull request?
In this case, the partition pruning happens before the planning phase of scalar 
subquery expressions.
For scalar subquery expressions, the planning occurs late in the cycle (after 
the physical planning)  in "PlanSubqueries" just before execution. Currently we 
try to execute the scalar subquery expression as part of partition pruning and 
fail as it implements Unevaluable.

The fix attempts to ignore the Subquery expressions from partition pruning 
computation. Another option can be to somehow plan the subqueries before the 
partition pruning. Since this may not be a commonly occuring expression, i am 
opting for a simpler fix.

Repro
``` SQL
CREATE TABLE test_prc_bug (
id_value string
)
partitioned by (id_type string)
location '/tmp/test_prc_bug'
stored as parquet;

insert into test_prc_bug values ('1','a');
insert into test_prc_bug values ('2','a');
insert into test_prc_bug values ('3','b');
insert into test_prc_bug values ('4','b');

select * from test_prc_bug
where id_type = (select 'b');
```
## How was this patch tested?
Added test in SubquerySuite and hive/SQLQuerySuite

Author: Dilip Biswal <[email protected]>

Closes #21174 from dilipbiswal/spark-24085.

(cherry picked from commit 3fd297af6dc568357c97abf86760c570309d6597)
Signed-off-by: gatorsmile <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a10df0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a10df0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a10df0f

Branch: refs/heads/branch-2.3
Commit: 4a10df0f66f74ec2c995f9832d1ab74112bdeb16
Parents: 07ec75c
Author: Dilip Biswal <[email protected]>
Authored: Fri Apr 27 11:43:29 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Fri Apr 27 11:43:39 2018 -0700

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        |  5 +++-
 .../datasources/PruneFileSourcePartitions.scala |  4 ++-
 .../org/apache/spark/sql/SubquerySuite.scala    | 15 ++++++++++
 .../sql/hive/execution/SQLQuerySuite.scala      | 31 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 16b2271..0a568d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -76,7 +76,10 @@ object FileSourceStrategy extends Strategy with Logging {
           fsRelation.partitionSchema, 
fsRelation.sparkSession.sessionState.analyzer.resolver)
       val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
-        
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+        ExpressionSet(normalizedFilters
+          .filterNot(SubqueryExpression.hasSubquery(_))
+          .filter(_.references.subsetOf(partitionSet)))
+
       logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =

http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 3b830ac..16b2367 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -55,7 +55,9 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
           partitionSchema, sparkSession.sessionState.analyzer.resolver)
       val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
-        
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+        ExpressionSet(normalizedFilters
+          .filterNot(SubqueryExpression.hasSubquery(_))
+          .filter(_.references.subsetOf(partitionSet)))
 
       if (partitionKeyFilters.nonEmpty) {
         val prunedFileIndex = 
catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 31e8b0e..acef62d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -955,4 +955,19 @@ class SubquerySuite extends QueryTest with 
SharedSQLContext {
     // before the fix this would throw AnalysisException
     spark.range(10).where("(id,id) in (select id, null from range(3))").count
   }
+
+  test("SPARK-24085 scalar subquery in partitioning expression") {
+    withTable("parquet_part") {
+      Seq("1" -> "a", "2" -> "a", "3" -> "b", "4" -> "b")
+        .toDF("id_value", "id_type")
+        .write
+        .mode(SaveMode.Overwrite)
+        .partitionBy("id_type")
+        .format("parquet")
+        .saveAsTable("parquet_part")
+      checkAnswer(
+        sql("SELECT * FROM parquet_part WHERE id_type = (SELECT 'b')"),
+        Row("3", "b") :: Row("4", "b") :: Nil)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a10df0f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5e14f53..081d854 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2154,4 +2154,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       }
     }
   }
+
+  test("SPARK-24085 scalar subquery in partitioning expression") {
+    Seq("orc", "parquet").foreach { format =>
+      Seq(true, false).foreach { isConverted =>
+        withSQLConf(
+          HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
+          HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted",
+          "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          withTable(format) {
+            withTempPath { tempDir =>
+              sql(
+                s"""
+                  |CREATE TABLE ${format} (id_value string)
+                  |PARTITIONED BY (id_type string)
+                  |LOCATION '${tempDir.toURI}'
+                  |STORED AS ${format}
+                """.stripMargin)
+              sql(s"insert into $format values ('1','a')")
+              sql(s"insert into $format values ('2','a')")
+              sql(s"insert into $format values ('3','b')")
+              sql(s"insert into $format values ('4','b')")
+              checkAnswer(
+                sql(s"SELECT * FROM $format WHERE id_type = (SELECT 'b')"),
+                Row("3", "b") :: Row("4", "b") :: Nil)
+            }
+          }
+        }
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to